Alternative approach for storing metadata (#29309)

This commit is contained in:
Christopher Batey 2020-06-25 14:52:55 +01:00
parent 08182bbdeb
commit e98f1311f3
26 changed files with 425 additions and 235 deletions

View file

@ -0,0 +1,29 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.util
import java.util.concurrent.atomic.AtomicLong
import java.util.function.LongUnaryOperator
import akka.annotation.InternalApi
/**
* Always increasing wall clock time.
*/
@InternalApi
private[akka] final class AlwaysIncreasingClock() extends AtomicLong with WallClock {
override def currentTimeMillis(): Long = {
val currentSystemTime = System.currentTimeMillis()
updateAndGet {
new LongUnaryOperator {
override def applyAsLong(time: Long): Long = {
if (currentSystemTime <= time) time + 1
else currentSystemTime
}
}
}
}
}

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.util
import akka.annotation.ApiMayChange
/**
* A time source.
*/
@ApiMayChange
trait WallClock {
def currentTimeMillis(): Long
}
object WallClock {
/**
* Always increasing time source.
*/
val AlwaysIncreasingClock: WallClock = new AlwaysIncreasingClock()
}

View file

@ -79,7 +79,7 @@ final class EventEnvelope(
override def equals(obj: Any): Boolean = obj match { override def equals(obj: Any): Boolean = obj match {
case other: EventEnvelope => case other: EventEnvelope =>
offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr && offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr &&
event == other.event // timestamp not included in equals for backwards compatibility event == other.event // timestamp && metadata not included in equals for backwards compatibility
case _ => false case _ => false
} }

View file

@ -6,16 +6,12 @@ package akka.persistence.query.journal.leveldb
import akka.actor.Props import akka.actor.Props
import akka.persistence.PersistentActor import akka.persistence.PersistentActor
import akka.persistence.journal.EventWithMetaData
import akka.persistence.query.journal.leveldb.TestActor.WithMeta
object TestActor { object TestActor {
def props(persistenceId: String): Props = def props(persistenceId: String): Props =
Props(new TestActor(persistenceId)) Props(new TestActor(persistenceId))
case class DeleteCmd(toSeqNr: Long = Long.MaxValue) case class DeleteCmd(toSeqNr: Long = Long.MaxValue)
case class WithMeta(payload: String, meta: Any)
} }
class TestActor(override val persistenceId: String) extends PersistentActor { class TestActor(override val persistenceId: String) extends PersistentActor {
@ -30,10 +26,6 @@ class TestActor(override val persistenceId: String) extends PersistentActor {
case DeleteCmd(toSeqNr) => case DeleteCmd(toSeqNr) =>
deleteMessages(toSeqNr) deleteMessages(toSeqNr)
sender() ! s"$toSeqNr-deleted" sender() ! s"$toSeqNr-deleted"
case WithMeta(payload, meta) =>
persist(EventWithMetaData(payload, meta)) { _ =>
sender() ! s"$payload-done"
}
case cmd: String => case cmd: String =>
persist(cmd) { evt => persist(cmd) { evt =>

View file

@ -52,6 +52,12 @@ trait JournalCapabilityFlags extends CapabilityFlags {
*/ */
protected def supportsSerialization: CapabilityFlag protected def supportsSerialization: CapabilityFlag
/**
* When `true` enables tests which check if the Journal stores and returns
* metadata for an event
*/
protected def supportsMetadata: CapabilityFlag
} }
//#journal-flags //#journal-flags

View file

@ -54,6 +54,8 @@ abstract class JournalSpec(config: Config)
override protected def supportsSerialization: CapabilityFlag = true override protected def supportsSerialization: CapabilityFlag = true
override protected def supportsMetadata: CapabilityFlag = false
override protected def beforeEach(): Unit = { override protected def beforeEach(): Unit = {
super.beforeEach() super.beforeEach()
senderProbe = TestProbe() senderProbe = TestProbe()
@ -79,7 +81,7 @@ abstract class JournalSpec(config: Config)
extension.journalFor(null) extension.journalFor(null)
def replayedMessage(snr: Long, deleted: Boolean = false): ReplayedMessage = def replayedMessage(snr: Long, deleted: Boolean = false): ReplayedMessage =
ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender, writerUuid, 0L)) ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender, writerUuid, 0L, None))
def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef, writerUuid: String): Unit = { def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef, writerUuid: String): Unit = {
@ -112,7 +114,7 @@ abstract class JournalSpec(config: Config)
probe.expectMsg(WriteMessagesSuccessful) probe.expectMsg(WriteMessagesSuccessful)
(fromSnr to toSnr).foreach { i => (fromSnr to toSnr).foreach { i =>
probe.expectMsgPF() { probe.expectMsgPF() {
case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`, `writerUuid`, _), _) => case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`, `writerUuid`, _, _), _) =>
payload should be(s"a-${i}") payload should be(s"a-${i}")
} }
} }
@ -263,15 +265,15 @@ abstract class JournalSpec(config: Config)
val Pid = pid val Pid = pid
val WriterUuid = writerUuid val WriterUuid = writerUuid
probe.expectMsgPF() { probe.expectMsgPF() {
case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, _), _) =>
payload should be(s"b-6") payload should be(s"b-6")
} }
probe.expectMsgPF() { probe.expectMsgPF() {
case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid, _), _, _) => case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid, _, _), _, _) =>
payload should be(notSerializableEvent) payload should be(notSerializableEvent)
} }
probe.expectMsgPF() { probe.expectMsgPF() {
case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid, _, _), _) =>
payload should be(s"b-8") payload should be(s"b-8")
} }
} }
@ -296,13 +298,13 @@ abstract class JournalSpec(config: Config)
val Pid = pid val Pid = pid
val WriterUuid = writerUuid val WriterUuid = writerUuid
probe.expectMsgPF() { probe.expectMsgPF() {
case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, _), _) =>
payload should be(event) payload should be(event)
} }
journal ! ReplayMessages(6, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) journal ! ReplayMessages(6, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
receiverProbe.expectMsgPF() { receiverProbe.expectMsgPF() {
case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _)) => case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, _)) =>
payload should be(event) payload should be(event)
} }
receiverProbe.expectMsgPF() { receiverProbe.expectMsgPF() {
@ -310,5 +312,34 @@ abstract class JournalSpec(config: Config)
} }
} }
} }
optional(flag = supportsMetadata) {
"return metadata" in {
val probe = TestProbe()
val event = TestPayload(probe.ref)
val meta = "meta-data"
val aw =
AtomicWrite(
PersistentRepr(
payload = event,
sequenceNr = 1L,
persistenceId = pid,
sender = Actor.noSender,
writerUuid = writerUuid).withMetadata(meta))
journal ! WriteMessages(List(aw), probe.ref, actorInstanceId)
probe.expectMsg(WriteMessagesSuccessful)
val Pid = pid
val WriterUuid = writerUuid
probe.expectMsgPF() {
case WriteMessageSuccess(
PersistentImpl(payload, 1L, Pid, _, _, Actor.noSender, WriterUuid, _, Some(`meta`)),
_) =>
payload should be(event)
}
}
}
} }
} }

View file

@ -10,7 +10,6 @@ import scala.collection.immutable
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.PersistentRepr import akka.persistence.PersistentRepr
import akka.persistence.testkit.EventStorage.{ JournalPolicies, Metadata }
import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies
import akka.persistence.testkit.internal.TestKitStorage import akka.persistence.testkit.internal.TestKitStorage
import akka.util.ccompat.JavaConverters._ import akka.util.ccompat.JavaConverters._
@ -19,7 +18,7 @@ import akka.util.ccompat.JavaConverters._
* INTERNAL API * INTERNAL API
*/ */
@InternalApi @InternalApi
private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (PersistentRepr, Metadata)] { private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, PersistentRepr] {
import EventStorage._ import EventStorage._
@ -31,10 +30,10 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (Pe
// and therefore must be done at the same time with the update, not before // and therefore must be done at the same time with the update, not before
updateOrSetNew(key, v => v ++ mapAny(key, elems).toVector) updateOrSetNew(key, v => v ++ mapAny(key, elems).toVector)
override def reprToSeqNum(repr: (PersistentRepr, Metadata)): Long = repr._1.sequenceNr override def reprToSeqNum(repr: (PersistentRepr)): Long = repr.sequenceNr
def add(elems: immutable.Seq[(PersistentRepr, Metadata)]): Unit = def add(elems: immutable.Seq[PersistentRepr]): Unit =
elems.groupBy(_._1.persistenceId).foreach { gr => elems.groupBy(_.persistenceId).foreach { gr =>
add(gr._1, gr._2) add(gr._1, gr._2)
} }
@ -43,11 +42,11 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (Pe
/** /**
* @throws Exception from StorageFailure in the current writing policy * @throws Exception from StorageFailure in the current writing policy
*/ */
def tryAdd(elems: immutable.Seq[(PersistentRepr, Metadata)]): Try[Unit] = { def tryAdd(elems: immutable.Seq[PersistentRepr]): Try[Unit] = {
val grouped = elems.groupBy(_._1.persistenceId) val grouped = elems.groupBy(_.persistenceId)
val processed = grouped.map { val processed = grouped.map {
case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_._1.payload))) case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload)))
} }
val reduced: ProcessingResult = val reduced: ProcessingResult =
@ -73,8 +72,8 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (Pe
persistenceId: String, persistenceId: String,
fromSequenceNr: Long, fromSequenceNr: Long,
toSequenceNr: Long, toSequenceNr: Long,
max: Long): immutable.Seq[(PersistentRepr, Metadata)] = { max: Long): immutable.Seq[PersistentRepr] = {
val batch: immutable.Seq[(PersistentRepr, Metadata)] = read(persistenceId, fromSequenceNr, toSequenceNr, max) val batch = read(persistenceId, fromSequenceNr, toSequenceNr, max)
currentPolicy.tryProcess(persistenceId, ReadEvents(batch)) match { currentPolicy.tryProcess(persistenceId, ReadEvents(batch)) match {
case ProcessingSuccess => batch case ProcessingSuccess => batch
case Reject(ex) => throw ex case Reject(ex) => throw ex
@ -98,35 +97,15 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (Pe
} }
} }
private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[(PersistentRepr, Metadata)] = { private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[PersistentRepr] = {
val sn = getHighestSeqNumber(key) + 1 val sn = getHighestSeqNumber(key) + 1
elems.zipWithIndex.map(p => (PersistentRepr(p._1, p._2 + sn, key), NoMetadata)) elems.zipWithIndex.map(p => PersistentRepr(p._1, p._2 + sn, key))
} }
} }
object EventStorage { object EventStorage {
object JournalPolicies extends DefaultPolicies[JournalOperation] object JournalPolicies extends DefaultPolicies[JournalOperation]
/**
* INTERNAL API
*/
@InternalApi
private[testkit] sealed trait Metadata
/**
* INTERNAL API
*/
@InternalApi
private[testkit] case object NoMetadata extends Metadata
/**
* INTERNAL API
*/
@InternalApi
private[testkit] final case class WithMetadata(payload: Any) extends Metadata
} }
/** /**

View file

@ -10,9 +10,8 @@ import scala.util.Try
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence._ import akka.persistence._
import akka.persistence.journal.{ AsyncWriteJournal, EventWithMetaData, Tagged } import akka.persistence.journal.{ AsyncWriteJournal, Tagged }
import akka.persistence.snapshot.SnapshotStore import akka.persistence.snapshot.SnapshotStore
import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata }
import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension } import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension }
/** /**
@ -26,32 +25,30 @@ class PersistenceTestKitPlugin extends AsyncWriteJournal {
private final val storage = InMemStorageExtension(context.system) private final val storage = InMemStorageExtension(context.system)
private val eventStream = context.system.eventStream private val eventStream = context.system.eventStream
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
Future.fromTry(Try(messages.map(aw => { Future.fromTry(Try(messages.map(aw => {
val data = aw.payload.map(pl => val data = aw.payload.map(pl =>
pl.payload match { pl.payload match {
// TODO define how to handle tagged and metadata case Tagged(p, _) => pl.withPayload(p).withTimestamp(System.currentTimeMillis())
case Tagged(p, _) => (pl.withPayload(p).withTimestamp(System.currentTimeMillis()), NoMetadata) case _ => pl.withTimestamp(System.currentTimeMillis())
case evt: EventWithMetaData =>
(pl.withPayload(evt.event).withTimestamp(System.currentTimeMillis()), WithMetadata(evt.metaData))
case _ => (pl.withTimestamp(System.currentTimeMillis()), NoMetadata)
}) })
val result: Try[Unit] = storage.tryAdd(data) val result: Try[Unit] = storage.tryAdd(data)
result.foreach { _ => result.foreach { _ =>
messages.foreach(aw => messages.foreach { aw =>
eventStream.publish(PersistenceTestKitPlugin.Write(aw.persistenceId, aw.highestSequenceNr))) eventStream.publish(PersistenceTestKitPlugin.Write(aw.persistenceId, aw.highestSequenceNr))
}
} }
result result
}))) })))
}
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
Future.fromTry(Try(storage.tryDelete(persistenceId, toSequenceNr))) Future.fromTry(Try(storage.tryDelete(persistenceId, toSequenceNr)))
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
recoveryCallback: PersistentRepr => Unit): Future[Unit] = recoveryCallback: PersistentRepr => Unit): Future[Unit] =
Future.fromTry( Future.fromTry(Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(recoveryCallback)))
Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).map(_._1).foreach(recoveryCallback)))
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
Future.fromTry(Try { Future.fromTry(Try {

View file

@ -8,7 +8,6 @@ import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.PersistentRepr import akka.persistence.PersistentRepr
import akka.persistence.testkit.EventStorage import akka.persistence.testkit.EventStorage
import akka.persistence.testkit.EventStorage.Metadata
import akka.persistence.testkit.internal.SerializedEventStorageImpl.Serialized import akka.persistence.testkit.internal.SerializedEventStorageImpl.Serialized
import akka.serialization.{ Serialization, SerializationExtension, Serializers } import akka.serialization.{ Serialization, SerializationExtension, Serializers }
@ -21,7 +20,7 @@ private[testkit] object SerializedEventStorageImpl {
payloadSerManifest: String, payloadSerManifest: String,
writerUuid: String, writerUuid: String,
payload: Array[Byte], payload: Array[Byte],
metadata: Metadata) metadata: Option[Any])
} }
/** /**
@ -37,23 +36,28 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E
/** /**
* @return (serializer id, serialized bytes) * @return (serializer id, serialized bytes)
*/ */
override def toInternal(repr: (PersistentRepr, Metadata)): Serialized = override def toInternal(pr: PersistentRepr): Serialized =
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
val (pr, meta) = repr
val payload = pr.payload.asInstanceOf[AnyRef] val payload = pr.payload.asInstanceOf[AnyRef]
val s = serialization.findSerializerFor(payload) val s = serialization.findSerializerFor(payload)
val manifest = Serializers.manifestFor(s, payload) val manifest = Serializers.manifestFor(s, payload)
Serialized(pr.persistenceId, pr.sequenceNr, s.identifier, manifest, pr.writerUuid, s.toBinary(payload), meta) Serialized(
pr.persistenceId,
pr.sequenceNr,
s.identifier,
manifest,
pr.writerUuid,
s.toBinary(payload),
pr.metadata)
} }
/** /**
* @param internal (serializer id, serialized bytes) * @param internal (serializer id, serialized bytes)
*/ */
override def toRepr(internal: Serialized): (PersistentRepr, Metadata) = { override def toRepr(internal: Serialized): PersistentRepr = {
val event = serialization.deserialize(internal.payload, internal.payloadSerId, internal.payloadSerManifest).get val event = serialization.deserialize(internal.payload, internal.payloadSerId, internal.payloadSerManifest).get
( val pr = PersistentRepr(event, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid)
PersistentRepr(event, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid), internal.metadata.fold(pr)(meta => pr.withMetadata(meta))
internal.metadata)
} }
} }

View file

@ -7,7 +7,6 @@ package akka.persistence.testkit.internal
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence._ import akka.persistence._
import akka.persistence.testkit.EventStorage import akka.persistence.testkit.EventStorage
import akka.persistence.testkit.EventStorage.Metadata
/** /**
* INTERNAL API * INTERNAL API
@ -15,10 +14,10 @@ import akka.persistence.testkit.EventStorage.Metadata
@InternalApi @InternalApi
private[testkit] class SimpleEventStorageImpl extends EventStorage { private[testkit] class SimpleEventStorageImpl extends EventStorage {
override type InternalRepr = (PersistentRepr, Metadata) override type InternalRepr = PersistentRepr
override def toInternal(repr: (PersistentRepr, Metadata)): (PersistentRepr, Metadata) = repr override def toInternal(repr: PersistentRepr): PersistentRepr = repr
override def toRepr(internal: (PersistentRepr, Metadata)): (PersistentRepr, Metadata) = internal override def toRepr(internal: PersistentRepr): PersistentRepr = internal
} }

View file

@ -7,7 +7,6 @@ import akka.actor.ActorRef
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.query.{ EventEnvelope, Sequence } import akka.persistence.query.{ EventEnvelope, Sequence }
import akka.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin } import akka.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin }
import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata }
import akka.stream.{ Attributes, Outlet, SourceShape } import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageLogicWithLogging, OutHandler } import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageLogicWithLogging, OutHandler }
@ -37,7 +36,6 @@ final private[akka] class EventsByPersistenceIdStage(
val (_, msg) = in val (_, msg) = in
msg match { msg match {
case PersistenceTestKitPlugin.Write(pid, toSequenceNr) if pid == persistenceId => case PersistenceTestKitPlugin.Write(pid, toSequenceNr) if pid == persistenceId =>
log.debug("Write notification {} {}", pid, toSequenceNr)
if (toSequenceNr >= currentSequenceNr) { if (toSequenceNr >= currentSequenceNr) {
tryPush() tryPush()
} }
@ -50,7 +48,7 @@ final private[akka] class EventsByPersistenceIdStage(
val event = storage.tryRead(persistenceId, currentSequenceNr, currentSequenceNr, 1) val event = storage.tryRead(persistenceId, currentSequenceNr, currentSequenceNr, 1)
log.debug("tryPush available. Query for {} {} result {}", currentSequenceNr, currentSequenceNr, event) log.debug("tryPush available. Query for {} {} result {}", currentSequenceNr, currentSequenceNr, event)
event.headOption match { event.headOption match {
case Some((pr, meta)) => case Some(pr) =>
push( push(
out, out,
EventEnvelope( EventEnvelope(
@ -59,10 +57,7 @@ final private[akka] class EventsByPersistenceIdStage(
pr.sequenceNr, pr.sequenceNr,
pr.payload, pr.payload,
pr.timestamp, pr.timestamp,
meta match { pr.metadata))
case NoMetadata => None
case WithMetadata(m) => Some(m)
}))
if (currentSequenceNr == toSequenceNr) { if (currentSequenceNr == toSequenceNr) {
completeStage() completeStage()
} else { } else {

View file

@ -8,7 +8,6 @@ import akka.actor.ExtendedActorSystem
import akka.persistence.query.{ EventEnvelope, Sequence } import akka.persistence.query.{ EventEnvelope, Sequence }
import akka.persistence.query.scaladsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal } import akka.persistence.query.scaladsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal }
import akka.persistence.testkit.EventStorage import akka.persistence.testkit.EventStorage
import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata }
import akka.persistence.testkit.internal.InMemStorageExtension import akka.persistence.testkit.internal.InMemStorageExtension
import akka.persistence.testkit.query.internal.EventsByPersistenceIdStage import akka.persistence.testkit.query.internal.EventsByPersistenceIdStage
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
@ -22,7 +21,7 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem)
with EventsByPersistenceIdQuery with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery { with CurrentEventsByPersistenceIdQuery {
private final val storage: EventStorage = InMemStorageExtension(system) private val storage: EventStorage = InMemStorageExtension(system)
override def eventsByPersistenceId( override def eventsByPersistenceId(
persistenceId: String, persistenceId: String,
@ -35,12 +34,8 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem)
persistenceId: String, persistenceId: String,
fromSequenceNr: Long, fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = { toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map { Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map { pr =>
case (pr, meta) => EventEnvelope(Sequence(pr.sequenceNr), persistenceId, pr.sequenceNr, pr.payload, pr.timestamp, pr.metadata)
EventEnvelope(Sequence(pr.sequenceNr), persistenceId, pr.sequenceNr, pr.payload, pr.timestamp, meta match {
case NoMetadata => None
case WithMetadata(payload) => Some(payload)
})
} }
} }
} }

View file

@ -18,7 +18,6 @@ import akka.annotation.ApiMayChange
import akka.persistence.Persistence import akka.persistence.Persistence
import akka.persistence.PersistentRepr import akka.persistence.PersistentRepr
import akka.persistence.SnapshotMetadata import akka.persistence.SnapshotMetadata
import akka.persistence.testkit.EventStorage.Metadata
import akka.persistence.testkit._ import akka.persistence.testkit._
import akka.persistence.testkit.internal.InMemStorageExtension import akka.persistence.testkit.internal.InMemStorageExtension
import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension
@ -423,9 +422,9 @@ object SnapshotTestKit {
*/ */
@ApiMayChange @ApiMayChange
class PersistenceTestKit(system: ActorSystem) class PersistenceTestKit(system: ActorSystem)
extends PersistenceTestKitOps[(PersistentRepr, Metadata), JournalOperation] extends PersistenceTestKitOps[PersistentRepr, JournalOperation]
with ExpectOps[(PersistentRepr, Metadata)] with ExpectOps[PersistentRepr]
with HasStorage[JournalOperation, (PersistentRepr, Metadata)] { with HasStorage[JournalOperation, PersistentRepr] {
require( require(
Try(Persistence(system).journalFor(PersistenceTestKitPlugin.PluginId)).isSuccess, Try(Persistence(system).journalFor(PersistenceTestKitPlugin.PluginId)).isSuccess,
"The test persistence plugin is not configured.") "The test persistence plugin is not configured.")
@ -494,7 +493,7 @@ class PersistenceTestKit(system: ActorSystem)
def persistedInStorage(persistenceId: String): immutable.Seq[Any] = def persistedInStorage(persistenceId: String): immutable.Seq[Any] =
storage.read(persistenceId).getOrElse(List.empty).map(reprToAny) storage.read(persistenceId).getOrElse(List.empty).map(reprToAny)
override private[testkit] def reprToAny(repr: (PersistentRepr, Metadata)): Any = repr._1.payload override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload
} }
@ApiMayChange @ApiMayChange

View file

@ -4,16 +4,14 @@
package akka.persistence.testkit.query package akka.persistence.testkit.query
import akka.{ Done, NotUsed } import akka.Done
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.persistence.journal.EventWithMetaData
import akka.persistence.query.{ EventEnvelope, PersistenceQuery } import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.{ EventAdapter, EventSeq, PersistenceId } import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior } import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.wordspec.AnyWordSpecLike
@ -40,20 +38,7 @@ object EventsByPersistenceIdSpec {
command.ack ! Done command.ack ! Done
}, },
(state, _) => state) (state, _) => state)
}.eventAdapter(new EventAdapter[String, Any] {
override def toJournal(e: String): Any = {
if (e.startsWith("m")) {
EventWithMetaData(e, s"$e-meta")
} else {
e
} }
}
override def manifest(event: String): String = ""
override def fromJournal(p: Any, manifest: String): EventSeq[String] = p match {
case e: EventWithMetaData => EventSeq.single(e.event.toString)
case _ => EventSeq.single(p.toString)
}
})
} }
@ -148,22 +133,5 @@ class EventsByPersistenceIdSpec
probe.cancel() probe.cancel()
} }
"return metadata in queries" in {
val ackProbe = createTestProbe[Done]()
val ref = setupEmpty("with-meta")
ref ! Command("m-1", ackProbe.ref)
ref ! Command("m-2", ackProbe.ref)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("with-meta", 0L, Long.MaxValue)
val probe =
src.runWith(TestSink.probe[Any]).request(3)
probe.expectNextPF {
case e @ EventEnvelope(_, "with-meta", 1L, "m-1") if e.eventMetadata.contains("m-1-meta") =>
}
probe.expectNextPF {
case e @ EventEnvelope(_, "with-meta", 2L, "m-2") if e.eventMetadata.contains("m-2-meta") =>
}
}
} }
} }

View file

@ -37,6 +37,7 @@ class PersistenceTestKitJournalCompatSpec extends JournalSpec(config = Persisten
} }
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true
override protected def supportsMetadata: CapabilityFlag = true
} }
class PersistenceTestKitSnapshotStoreCompatSpec class PersistenceTestKitSnapshotStoreCompatSpec

View file

@ -0,0 +1,148 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveSpec {
val AllReplicas = Set("R1", "R2", "R3")
sealed trait Command
case class GetState(replyTo: ActorRef[State]) extends Command
case class StoreMe(description: String, replyTo: ActorRef[Done]) extends Command
case class GetReplica(replyTo: ActorRef[(String, Set[String])]) extends Command
case class State(all: List[String])
def testBehavior(entityId: String, replicaId: String, probe: ActorRef[EventAndContext]): Behavior[Command] =
testBehavior(entityId, replicaId, Some(probe))
def testBehavior(
entityId: String,
replicaId: String,
probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] =
ActiveActiveEventSourcing(entityId, replicaId, AllReplicas, PersistenceTestKitReadJournal.Identifier)(
aaContext =>
EventSourcedBehavior[Command, String, State](
aaContext.persistenceId,
State(Nil),
(state, command) =>
command match {
case GetState(replyTo) =>
replyTo ! state
Effect.none
case GetReplica(replyTo) =>
replyTo.tell((aaContext.replicaId, aaContext.allReplicas))
Effect.none
case StoreMe(evt, ack) =>
Effect.persist(evt).thenRun(_ => ack ! Done)
},
(state, event) => {
probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning))
state.copy(all = event :: state.all)
}))
}
case class EventAndContext(event: Any, origin: String, recoveryRunning: Boolean = false)
class ActiveActiveSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing
with Eventually {
import ActiveActiveSpec._
val ids = new AtomicInteger(0)
def nextEntityId = s"e-${ids.getAndIncrement()}"
"ActiveActiveEventSourcing" should {
"replicate events between entities" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val r1 = spawn(testBehavior(entityId, "R1"))
val r2 = spawn(testBehavior(entityId, "R2"))
r1 ! StoreMe("from r1", probe.ref)
r2 ! StoreMe("from r2", probe.ref)
eventually {
val probe = createTestProbe[State]()
r1 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2")
}
eventually {
val probe = createTestProbe[State]()
r2 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2")
}
}
"get all events in recovery" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val r1 = spawn(testBehavior(entityId, "R1"))
val r2 = spawn(testBehavior(entityId, "R2"))
r1 ! StoreMe("from r1", probe.ref)
r2 ! StoreMe("from r2", probe.ref)
r1 ! StoreMe("from r1 again", probe.ref)
val r3 = spawn(testBehavior(entityId, "R3"))
eventually {
val probe = createTestProbe[State]()
r3 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2", "from r1 again")
}
}
"have access to replica information" in {
val entityId = nextEntityId
val probe = createTestProbe[(String, Set[String])]()
val r1 = spawn(testBehavior(entityId, "R1"))
r1 ! GetReplica(probe.ref)
probe.expectMessage(("R1", Set("R1", "R2", "R3")))
}
"have access to event origin" in {
val entityId = nextEntityId
val replyProbe = createTestProbe[Done]()
val eventProbeR1 = createTestProbe[EventAndContext]()
val eventProbeR2 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref))
r1 ! StoreMe("from r1", replyProbe.ref)
eventProbeR2.expectMessage(EventAndContext("from r1", "R1"))
eventProbeR1.expectMessage(EventAndContext("from r1", "R1"))
r2 ! StoreMe("from r2", replyProbe.ref)
eventProbeR1.expectMessage(EventAndContext("from r2", "R2"))
eventProbeR2.expectMessage(EventAndContext("from r2", "R2"))
}
"set recovery running" in {
val entityId = nextEntityId
val eventProbeR1 = createTestProbe[EventAndContext]()
val replyProbe = createTestProbe[Done]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
r1 ! StoreMe("Event", replyProbe.ref)
eventProbeR1.expectMessage(EventAndContext("Event", "R1", recoveryRunning = false))
replyProbe.expectMessage(Done)
val recoveryProbe = createTestProbe[EventAndContext]()
spawn(testBehavior(entityId, "R1", recoveryProbe.ref))
recoveryProbe.expectMessage(EventAndContext("Event", "R1", recoveryRunning = true))
}
"persist all" in {
pending
}
}
}

View file

@ -61,13 +61,19 @@ class AABlogExampleSpec
cmd match { cmd match {
case AddPost(_, content, replyTo) => case AddPost(_, content, replyTo) =>
val evt = val evt =
PostAdded(aa.persistenceId.id, content, state.contentTimestamp.increase(aa.timestamp, aa.replicaId)) PostAdded(
aa.persistenceId.id,
content,
state.contentTimestamp.increase(aa.currentTimeMillis(), aa.replicaId))
Effect.persist(evt).thenRun { _ => Effect.persist(evt).thenRun { _ =>
replyTo ! AddPostDone(aa.entityId) replyTo ! AddPostDone(aa.entityId)
} }
case ChangeBody(_, newContent, replyTo) => case ChangeBody(_, newContent, replyTo) =>
val evt = val evt =
BodyChanged(aa.persistenceId.id, newContent, state.contentTimestamp.increase(aa.timestamp, aa.replicaId)) BodyChanged(
aa.persistenceId.id,
newContent,
state.contentTimestamp.increase(aa.currentTimeMillis(), aa.replicaId))
Effect.persist(evt).thenRun { _ => Effect.persist(evt).thenRun { _ =>
replyTo ! Done replyTo ! Done
} }

View file

@ -242,7 +242,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
} }
override private[akka] def withActiveActive( override private[akka] def withActiveActive(
context: ActiveActiveContext, context: ActiveActiveContextImpl,
id: String, id: String,
allIds: Set[String], allIds: Set[String],
queryPluginId: String): EventSourcedBehavior[Command, Event, State] = { queryPluginId: String): EventSourcedBehavior[Command, Event, State] = {
@ -265,7 +265,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
// FIXME serializer // FIXME serializer
@InternalApi @InternalApi
private[akka] final case class ReplicatedEventMetaData(originDc: String) private[akka] final case class ReplicatedEventMetaData(originReplica: String, originSequenceNr: Long)
@InternalApi @InternalApi
private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long) private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long)
@InternalApi @InternalApi

View file

@ -5,7 +5,6 @@
package akka.persistence.typed.internal package akka.persistence.typed.internal
import scala.collection.immutable import scala.collection.immutable
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.PostStop import akka.actor.typed.PostStop
@ -19,7 +18,7 @@ import akka.annotation.InternalStableApi
import akka.persistence._ import akka.persistence._
import akka.persistence.JournalProtocol.ReplayMessages import akka.persistence.JournalProtocol.ReplayMessages
import akka.persistence.SnapshotProtocol.LoadSnapshot import akka.persistence.SnapshotProtocol.LoadSnapshot
import akka.util.unused import akka.util.{ unused, OptionVal }
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
@ -34,7 +33,8 @@ private[akka] trait JournalInteractions[C, E, S] {
cmd: Any, cmd: Any,
state: Running.RunningState[S], state: Running.RunningState[S],
event: EventOrTaggedOrReplicated, event: EventOrTaggedOrReplicated,
eventAdapterManifest: String): Running.RunningState[S] = { eventAdapterManifest: String,
metadata: OptionVal[Any]): Running.RunningState[S] = {
val newRunningState = state.nextSequenceNr() val newRunningState = state.nextSequenceNr()
@ -50,7 +50,11 @@ private[akka] trait JournalInteractions[C, E, S] {
// https://github.com/akka/akka/issues/29262 // https://github.com/akka/akka/issues/29262
onWriteInitiated(ctx, cmd, repr) onWriteInitiated(ctx, cmd, repr)
val write = AtomicWrite(repr) :: Nil val write = AtomicWrite(metadata match {
case OptionVal.Some(meta) => repr.withMetadata(meta)
case OptionVal.None => repr
}) :: Nil
setup.journal setup.journal
.tell(JournalProtocol.WriteMessages(write, setup.selfClassic, setup.writerIdentity.instanceId), setup.selfClassic) .tell(JournalProtocol.WriteMessages(write, setup.selfClassic, setup.writerIdentity.instanceId), setup.selfClassic)

View file

@ -6,7 +6,6 @@ package akka.persistence.typed.internal
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.typed.{ Behavior, Signal } import akka.actor.typed.{ Behavior, Signal }
import akka.actor.typed.internal.PoisonPill import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.internal.UnstashException import akka.actor.typed.internal.UnstashException
@ -120,7 +119,21 @@ private[akka] final class ReplayingEvents[C, E, S](
def handleEvent(event: E): Unit = { def handleEvent(event: E): Unit = {
eventForErrorReporting = OptionVal.Some(event) eventForErrorReporting = OptionVal.Some(event)
state = state.copy(seqNr = repr.sequenceNr) state = state.copy(seqNr = repr.sequenceNr)
state = state.copy(state = setup.eventHandler(state.state, event), eventSeenInInterval = true)
setup.activeActive match {
case Some(aa) =>
val meta = repr.metadata match {
case Some(m) => m.asInstanceOf[ReplicatedEventMetaData]
case None =>
throw new IllegalStateException(
s"Active active enabled but existing event has no metadata. Migration isn't supported yet.")
}
aa.setContext(recoveryRunning = true, meta.originReplica)
case None =>
}
val newState = setup.eventHandler(state.state, event)
state = state.copy(state = newState, eventSeenInInterval = true)
} }
eventSeq match { eventSeq match {
@ -240,7 +253,9 @@ private[akka] final class ReplayingEvents[C, E, S](
Behaviors.stopped Behaviors.stopped
else { else {
val seenPerReplica: Map[String, Long] = val seenPerReplica: Map[String, Long] =
setup.activeActive.map(aa => aa.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty) setup.activeActive
.map(aa => aa.allReplicas.filterNot(_ == aa.replicaId).map(replica => replica -> 0L).toMap)
.getOrElse(Map.empty)
val running = val running =
Running[C, E, S]( Running[C, E, S](
setup, setup,

View file

@ -23,7 +23,7 @@ import akka.persistence.PersistentRepr
import akka.persistence.SaveSnapshotFailure import akka.persistence.SaveSnapshotFailure
import akka.persistence.SaveSnapshotSuccess import akka.persistence.SaveSnapshotSuccess
import akka.persistence.SnapshotProtocol import akka.persistence.SnapshotProtocol
import akka.persistence.journal.{ EventWithMetaData, Tagged } import akka.persistence.journal.Tagged
import akka.persistence.query.{ EventEnvelope, PersistenceQuery } import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
import akka.persistence.typed.{ import akka.persistence.typed.{
@ -47,7 +47,7 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.stream.{ SharedKillSwitch, SystemMaterializer } import akka.stream.{ SharedKillSwitch, SystemMaterializer }
import akka.stream.scaladsl.{ RestartSource, Sink } import akka.stream.scaladsl.{ RestartSource, Sink }
import akka.stream.typed.scaladsl.ActorFlow import akka.stream.typed.scaladsl.ActorFlow
import akka.util.{ unused, Timeout } import akka.util.{ unused, OptionVal, Timeout }
/** /**
* INTERNAL API * INTERNAL API
@ -116,15 +116,16 @@ private[akka] object Running {
implicit val timeout = Timeout(30.seconds) implicit val timeout = Timeout(30.seconds)
// FIXME config
val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () =>
replication replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
.via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { .via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) {
(eventEnvelope, replyTo) => (eventEnvelope, replyTo) =>
val re = ReplicatedEvent[E]( // Need to handle this not being available migration from non-active-active is supported
eventEnvelope.event.asInstanceOf[E], val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData]
eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originDc, val re =
eventEnvelope.sequenceNr) // FIXME, this is the wrong sequence nr, we need origin sequence nr, follow up with tests that show this ReplicatedEvent[E](eventEnvelope.event.asInstanceOf[E], meta.originReplica, meta.originSequenceNr)
ReplicatedEventEnvelope(re, replyTo) ReplicatedEventEnvelope(re, replyTo)
}) })
} }
@ -161,7 +162,7 @@ private[akka] object Running {
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c) case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re) case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re, setup.activeActive.get)
case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state)
case get: GetState[S @unchecked] => onGetState(get) case get: GetState[S @unchecked] => onGetState(get)
@ -186,18 +187,20 @@ private[akka] object Running {
def onReplicatedEvent( def onReplicatedEvent(
state: Running.RunningState[S], state: Running.RunningState[S],
envelope: ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = { envelope: ReplicatedEventEnvelope[E],
// FIXME set the details on the context https://github.com/akka/akka/issues/29258 activeActive: ActiveActive): Behavior[InternalProtocol] = {
setup.log.infoN( setup.log.infoN(
"Replica {} received replicated event. Replica seqs nrs: {}", "Replica {} received replicated event. Replica seqs nrs: {}. Envelope {}",
setup.activeActive, setup.activeActive,
state.seenPerReplica) state.seenPerReplica,
envelope)
envelope.ack ! ReplicatedEventAck envelope.ack ! ReplicatedEventAck
if (envelope.event.originReplica != setup.activeActive.get.replicaId && !alreadySeen(envelope.event)) { if (envelope.event.originReplica != activeActive.replicaId && !alreadySeen(envelope.event)) {
setup.log.info("Saving event as first time") activeActive.setContext(false, envelope.event.originReplica)
handleReplicatedEventPersist(envelope.event) setup.log.debug("Saving event as first time")
handleExternalReplicatedEventPersist(envelope.event)
} else { } else {
setup.log.info("Filtering event as already seen") setup.log.debug("Filtering event as already seen")
this this
} }
} }
@ -208,11 +211,16 @@ private[akka] object Running {
this this
} }
private def handleReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { private def handleExternalReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
_currentSequenceNumber = state.seqNr + 1 _currentSequenceNumber = state.seqNr + 1
val replicatedEvent = new EventWithMetaData(event.event, ReplicatedEventMetaData(event.originReplica))
val newState: RunningState[S] = state.applyEvent(setup, event.event) val newState: RunningState[S] = state.applyEvent(setup, event.event)
val newState2: RunningState[S] = internalPersist(setup.context, null, newState, replicatedEvent, "") val newState2: RunningState[S] = internalPersist(
setup.context,
null,
newState,
event.event,
"",
OptionVal.Some(ReplicatedEventMetaData(event.originReplica, event.originSequenceNr)))
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
// FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259
val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr)
@ -224,6 +232,37 @@ private[akka] object Running {
Nil) Nil)
} }
private def handleEventPersist(event: E, cmd: Any, sideEffects: immutable.Seq[SideEffect[S]]) = {
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
_currentSequenceNumber = state.seqNr + 1
setup.activeActive.foreach { aa =>
aa.setContext(recoveryRunning = false, aa.replicaId)
}
val newState: RunningState[S] = state.applyEvent(setup, event)
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = setup.activeActive match {
case Some(aa) =>
internalPersist(
setup.context,
cmd,
newState,
eventToPersist,
eventAdapterManifest,
OptionVal.Some(ReplicatedEventMetaData(aa.replicaId, _currentSequenceNumber)))
case None =>
internalPersist(setup.context, cmd, newState, eventToPersist, eventAdapterManifest, OptionVal.None)
}
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
(persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false)
}
@tailrec def applyEffects( @tailrec def applyEffects(
msg: Any, msg: Any,
state: RunningState[S], state: RunningState[S],
@ -242,27 +281,7 @@ private[akka] object Running {
applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) applyEffects(msg, state, eff, currentSideEffects ++ sideEffects)
case Persist(event) => case Persist(event) =>
// apply the event before persist so that validation exception is handled before persisting handleEventPersist(event, msg, sideEffects)
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
_currentSequenceNumber = state.seqNr + 1
val newState: RunningState[S] = state.applyEvent(setup, event)
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = setup.activeActive match {
case Some(aa) =>
val replicatedEvent = ReplicatedEvent(eventToPersist, aa.replicaId, _currentSequenceNumber)
internalPersist(setup.context, msg, newState, replicatedEvent, eventAdapterManifest)
case None =>
internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest)
}
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
(persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false)
case PersistAll(events) => case PersistAll(events) =>
if (events.nonEmpty) { if (events.nonEmpty) {
// apply the event before persist so that validation exception is handled before persisting // apply the event before persist so that validation exception is handled before persisting

View file

@ -5,6 +5,7 @@
package akka.persistence.typed.scaladsl package akka.persistence.typed.scaladsl
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.util.WallClock
/** /**
* Utility class for comparing timestamp and data center * Utility class for comparing timestamp and data center
@ -34,8 +35,9 @@ final case class LwwTime(timestamp: Long, originDc: String) {
} }
} }
// FIXME docs
trait ActiveActiveContext { trait ActiveActiveContext {
def timestamp: Long
def origin: String def origin: String
def concurrent: Boolean def concurrent: Boolean
def replicaId: String def replicaId: String
@ -44,6 +46,7 @@ trait ActiveActiveContext {
def recoveryRunning: Boolean def recoveryRunning: Boolean
def entityId: String def entityId: String
def currentTimeMillis(): Long def currentTimeMillis(): Long
} }
// FIXME, parts of this can be set during initialisation // FIXME, parts of this can be set during initialisation
@ -51,18 +54,11 @@ trait ActiveActiveContext {
// https://github.com/akka/akka/issues/29258 // https://github.com/akka/akka/issues/29258
private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: String, val allReplicas: Set[String]) private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: String, val allReplicas: Set[String])
extends ActiveActiveContext { extends ActiveActiveContext {
var _timestamp: Long = -1
var _origin: String = null var _origin: String = null
var _concurrent: Boolean = false var _recoveryRunning: Boolean = false
// FIXME check illegal access https://github.com/akka/akka/issues/29264 // FIXME check illegal access https://github.com/akka/akka/issues/29264
/**
* The timestamp of the event. Always increases per data center
* Undefined result if called from any where other than an event handler.
*/
override def timestamp: Long = _timestamp
/** /**
* The origin of the current event. * The origin of the current event.
* Undefined result if called from anywhere other than an event handler. * Undefined result if called from anywhere other than an event handler.
@ -73,13 +69,14 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId:
* Whether the happened concurrently with an event from another replica. * Whether the happened concurrently with an event from another replica.
* Undefined result if called from any where other than an event handler. * Undefined result if called from any where other than an event handler.
*/ */
override def concurrent: Boolean = _concurrent override def concurrent: Boolean = throw new UnsupportedOperationException("TODO")
override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId) override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId)
override def currentTimeMillis(): Long = { override def currentTimeMillis(): Long = {
// FIXME always increasing WallClock.AlwaysIncreasingClock.currentTimeMillis()
System.currentTimeMillis()
} }
override def recoveryRunning: Boolean = false override def recoveryRunning: Boolean = _recoveryRunning
} }
object ActiveActiveEventSourcing { object ActiveActiveEventSourcing {

View file

@ -25,8 +25,18 @@ object EventSourcedBehavior {
private[akka] case class ActiveActive( private[akka] case class ActiveActive(
replicaId: String, replicaId: String,
allReplicas: Set[String], allReplicas: Set[String],
aaContext: ActiveActiveContext, aaContext: ActiveActiveContextImpl,
queryPluginId: String) queryPluginId: String) {
/**
* Must only be called on the same thread that will execute the user code
*/
def setContext(recoveryRunning: Boolean, originReplica: String): Unit = {
aaContext._recoveryRunning = recoveryRunning
aaContext._origin = originReplica
}
}
/** /**
* Type alias for the command handler function that defines how to act on commands. * Type alias for the command handler function that defines how to act on commands.
@ -152,7 +162,7 @@ object EventSourcedBehavior {
def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State]
private[akka] def withActiveActive( private[akka] def withActiveActive(
context: ActiveActiveContext, context: ActiveActiveContextImpl,
replicaId: String, replicaId: String,
allReplicaIds: Set[String], allReplicaIds: Set[String],
queryPluginId: String): EventSourcedBehavior[Command, Event, State] queryPluginId: String): EventSourcedBehavior[Command, Event, State]

View file

@ -102,6 +102,10 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per
def withTimestamp(newTimestamp: Long): PersistentRepr def withTimestamp(newTimestamp: Long): PersistentRepr
def metadata: Option[Any]
def withMetadata(metadata: Any): PersistentRepr
/** /**
* Unique identifier of the writing persistent actor. * Unique identifier of the writing persistent actor.
* Used to detect anomalies with overlapping writes from multiple * Used to detect anomalies with overlapping writes from multiple
@ -163,7 +167,7 @@ object PersistentRepr {
deleted: Boolean = false, deleted: Boolean = false,
sender: ActorRef = null, sender: ActorRef = null,
writerUuid: String = PersistentRepr.Undefined): PersistentRepr = writerUuid: String = PersistentRepr.Undefined): PersistentRepr =
PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender, writerUuid, 0L) PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender, writerUuid, 0L, None)
/** /**
* Java API, Plugin API. * Java API, Plugin API.
@ -188,7 +192,8 @@ private[persistence] final case class PersistentImpl(
override val deleted: Boolean, override val deleted: Boolean,
override val sender: ActorRef, override val sender: ActorRef,
override val writerUuid: String, override val writerUuid: String,
override val timestamp: Long) override val timestamp: Long,
override val metadata: Option[Any])
extends PersistentRepr extends PersistentRepr
with NoSerializationVerificationNeeded { with NoSerializationVerificationNeeded {
@ -203,6 +208,10 @@ private[persistence] final case class PersistentImpl(
if (this.timestamp == newTimestamp) this if (this.timestamp == newTimestamp) this
else copy(timestamp = newTimestamp) else copy(timestamp = newTimestamp)
override def withMetadata(metadata: Any): PersistentRepr = {
copy(metadata = Some(metadata))
}
def update(sequenceNr: Long, persistenceId: String, deleted: Boolean, sender: ActorRef, writerUuid: String) = def update(sequenceNr: Long, persistenceId: String, deleted: Boolean, sender: ActorRef, writerUuid: String) =
copy( copy(
sequenceNr = sequenceNr, sequenceNr = sequenceNr,
@ -221,6 +230,7 @@ private[persistence] final case class PersistentImpl(
result = HashCode.hash(result, sender) result = HashCode.hash(result, sender)
result = HashCode.hash(result, writerUuid) result = HashCode.hash(result, writerUuid)
// timestamp not included in equals for backwards compatibility // timestamp not included in equals for backwards compatibility
// meta not included in equals for backwards compatibility
result result
} }
@ -235,5 +245,4 @@ private[persistence] final case class PersistentImpl(
override def toString: String = { override def toString: String = {
s"PersistentRepr($persistenceId,$sequenceNr,$writerUuid,$timestamp)" s"PersistentRepr($persistenceId,$sequenceNr,$writerUuid,$timestamp)"
} }
} }

View file

@ -1,33 +0,0 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.journal
object EventWithMetaData {
def apply(event: Any, metaData: Any): EventWithMetaData = new EventWithMetaData(event, metaData)
/**
* If meta data could not be deserialized it will not fail the replay/query.
* The "invalid" meta data is represented with this `UnknownMetaData` and
* it and the event will be wrapped in `EventWithMetaData`.
*
* The reason for not failing the replay/query is that meta data should be
* optional, e.g. the tool that wrote the meta data has been removed. This
* is typically because the serializer for the meta data has been removed
* from the class path (or configuration).
*/
final class UnknownMetaData(val serializerId: Int, val manifest: String)
}
/**
* If the event is wrapped in this class the `metaData` will
* be serialized and stored separately from the event payload. This can be used by event
* adapters or other tools to store additional meta data without altering
* the actual domain event.
*
* Check the documentation of the persistence plugin in use to use if it supports
* EventWithMetaData.
*/
final class EventWithMetaData(val event: Any, val metaData: Any)

View file

@ -19,7 +19,7 @@ import akka.persistence.AtomicWrite
import akka.persistence.JournalProtocol.RecoverySuccess import akka.persistence.JournalProtocol.RecoverySuccess
import akka.persistence.PersistentRepr import akka.persistence.PersistentRepr
import akka.persistence.journal.inmem.InmemJournal.{ MessageWithMeta, ReplayWithMeta } import akka.persistence.journal.inmem.InmemJournal.{ MessageWithMeta, ReplayWithMeta }
import akka.persistence.journal.{ AsyncWriteJournal, EventWithMetaData, Tagged } import akka.persistence.journal.{ AsyncWriteJournal, Tagged }
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.serialization.Serializers import akka.serialization.Serializers
import akka.util.OptionVal import akka.util.OptionVal
@ -143,12 +143,9 @@ object InmemJournal {
// persistenceId -> highest used sequence number // persistenceId -> highest used sequence number
private var highestSequenceNumbers = Map.empty[String, Long] private var highestSequenceNumbers = Map.empty[String, Long]
// FIXME, which way around should Tagged/EventWithMeta go? https://github.com/akka/akka/issues/29284
def add(p: PersistentRepr): Unit = { def add(p: PersistentRepr): Unit = {
val pr = p.payload match { val pr = p.payload match {
case Tagged(payload, _) => (p.withPayload(payload).withTimestamp(System.currentTimeMillis()), OptionVal.None) case Tagged(payload, _) => (p.withPayload(payload).withTimestamp(System.currentTimeMillis()), OptionVal.None)
case meta: EventWithMetaData =>
(p.withPayload(meta.event).withTimestamp(System.currentTimeMillis()), OptionVal.Some(meta.metaData))
case _ => (p.withTimestamp(System.currentTimeMillis()), OptionVal.None) case _ => (p.withTimestamp(System.currentTimeMillis()), OptionVal.None)
} }