ActiveActive: Events with metadata and events by persistence id for (#29287)

This commit is contained in:
Christopher Batey 2020-06-24 15:37:20 +01:00
parent ad2d7e2d00
commit 08182bbdeb
29 changed files with 657 additions and 116 deletions

View file

@ -4,14 +4,26 @@
package akka.persistence.query package akka.persistence.query
import scala.runtime.AbstractFunction4 import java.util.Optional
import akka.annotation.InternalApi
import scala.runtime.AbstractFunction4
import akka.util.HashCode import akka.util.HashCode
// for binary compatibility (used to be a case class) // for binary compatibility (used to be a case class)
object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] { object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] {
def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long): EventEnvelope = def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp) new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, None)
def apply(
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Any,
timestamp: Long,
meta: Option[Any]): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, meta)
@deprecated("for binary compatibility", "2.6.2") @deprecated("for binary compatibility", "2.6.2")
override def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any): EventEnvelope = override def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any): EventEnvelope =
@ -34,13 +46,26 @@ final class EventEnvelope(
val persistenceId: String, val persistenceId: String,
val sequenceNr: Long, val sequenceNr: Long,
val event: Any, val event: Any,
val timestamp: Long) val timestamp: Long,
val eventMetadata: Option[Any])
extends Product4[Offset, String, Long, Any] extends Product4[Offset, String, Long, Any]
with Serializable { with Serializable {
@deprecated("for binary compatibility", "2.6.2") @deprecated("for binary compatibility", "2.6.2")
def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) = def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) =
this(offset, persistenceId, sequenceNr, event, 0L) this(offset, persistenceId, sequenceNr, event, 0L, None)
// bin compat 2.6.7
def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long) =
this(offset, persistenceId, sequenceNr, event, timestamp, None)
/**
* Java API
*/
def getEventMetaData(): Optional[Any] = {
import scala.compat.java8.OptionConverters._
eventMetadata.asJava
}
override def hashCode(): Int = { override def hashCode(): Int = {
var result = HashCode.SEED var result = HashCode.SEED
@ -59,7 +84,7 @@ final class EventEnvelope(
} }
override def toString: String = override def toString: String =
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$event,$timestamp)" s"EventEnvelope($offset,$persistenceId,$sequenceNr,$event,$timestamp,$eventMetadata)"
// for binary compatibility (used to be a case class) // for binary compatibility (used to be a case class)
def copy( def copy(
@ -67,7 +92,11 @@ final class EventEnvelope(
persistenceId: String = this.persistenceId, persistenceId: String = this.persistenceId,
sequenceNr: Long = this.sequenceNr, sequenceNr: Long = this.sequenceNr,
event: Any = this.event): EventEnvelope = event: Any = this.event): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp) new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, this.eventMetadata)
@InternalApi
private[akka] def withMetadata(metadata: Any): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, Some(metadata))
// Product4, for binary compatibility (used to be a case class) // Product4, for binary compatibility (used to be a case class)
override def productPrefix = "EventEnvelope" override def productPrefix = "EventEnvelope"

View file

@ -5,7 +5,6 @@
package akka.persistence.query.journal.leveldb package akka.persistence.query.journal.leveldb
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.JournalProtocol.RecoverySuccess import akka.persistence.JournalProtocol.RecoverySuccess

View file

@ -5,7 +5,6 @@
package akka.persistence.query.journal.leveldb package akka.persistence.query.journal.leveldb
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.JournalProtocol.RecoverySuccess import akka.persistence.JournalProtocol.RecoverySuccess

View file

@ -6,12 +6,16 @@ package akka.persistence.query.journal.leveldb
import akka.actor.Props import akka.actor.Props
import akka.persistence.PersistentActor import akka.persistence.PersistentActor
import akka.persistence.journal.EventWithMetaData
import akka.persistence.query.journal.leveldb.TestActor.WithMeta
object TestActor { object TestActor {
def props(persistenceId: String): Props = def props(persistenceId: String): Props =
Props(new TestActor(persistenceId)) Props(new TestActor(persistenceId))
case class DeleteCmd(toSeqNr: Long = Long.MaxValue) case class DeleteCmd(toSeqNr: Long = Long.MaxValue)
case class WithMeta(payload: String, meta: Any)
} }
class TestActor(override val persistenceId: String) extends PersistentActor { class TestActor(override val persistenceId: String) extends PersistentActor {
@ -26,10 +30,14 @@ class TestActor(override val persistenceId: String) extends PersistentActor {
case DeleteCmd(toSeqNr) => case DeleteCmd(toSeqNr) =>
deleteMessages(toSeqNr) deleteMessages(toSeqNr)
sender() ! s"$toSeqNr-deleted" sender() ! s"$toSeqNr-deleted"
case WithMeta(payload, meta) =>
persist(EventWithMetaData(payload, meta)) { _ =>
sender() ! s"$payload-done"
}
case cmd: String => case cmd: String =>
persist(cmd) { evt => persist(cmd) { evt =>
sender() ! evt + "-done" sender() ! s"$evt-done"
} }
} }

View file

@ -28,3 +28,7 @@ akka.persistence.testkit {
} }
} }
akka.persistence.testkit.query {
class = "akka.persistence.testkit.query.PersistenceTestKitReadJournalProvider"
}

View file

@ -8,9 +8,9 @@ import java.util.{ List => JList }
import scala.collection.immutable import scala.collection.immutable
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.PersistentRepr import akka.persistence.PersistentRepr
import akka.persistence.testkit.EventStorage.{ JournalPolicies, Metadata }
import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies
import akka.persistence.testkit.internal.TestKitStorage import akka.persistence.testkit.internal.TestKitStorage
import akka.util.ccompat.JavaConverters._ import akka.util.ccompat.JavaConverters._
@ -19,7 +19,7 @@ import akka.util.ccompat.JavaConverters._
* INTERNAL API * INTERNAL API
*/ */
@InternalApi @InternalApi
private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, PersistentRepr] { private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (PersistentRepr, Metadata)] {
import EventStorage._ import EventStorage._
@ -31,21 +31,23 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per
// and therefore must be done at the same time with the update, not before // and therefore must be done at the same time with the update, not before
updateOrSetNew(key, v => v ++ mapAny(key, elems).toVector) updateOrSetNew(key, v => v ++ mapAny(key, elems).toVector)
override def reprToSeqNum(repr: PersistentRepr): Long = repr.sequenceNr override def reprToSeqNum(repr: (PersistentRepr, Metadata)): Long = repr._1.sequenceNr
def add(elems: immutable.Seq[PersistentRepr]): Unit = def add(elems: immutable.Seq[(PersistentRepr, Metadata)]): Unit =
elems.groupBy(_.persistenceId).foreach(gr => add(gr._1, gr._2)) elems.groupBy(_._1.persistenceId).foreach { gr =>
add(gr._1, gr._2)
}
override protected val DefaultPolicy = JournalPolicies.PassAll override protected val DefaultPolicy = JournalPolicies.PassAll
/** /**
* @throws Exception from StorageFailure in the current writing policy * @throws Exception from StorageFailure in the current writing policy
*/ */
def tryAdd(elems: immutable.Seq[PersistentRepr]): Try[Unit] = { def tryAdd(elems: immutable.Seq[(PersistentRepr, Metadata)]): Try[Unit] = {
val grouped = elems.groupBy(_.persistenceId) val grouped = elems.groupBy(_._1.persistenceId)
val processed = grouped.map { val processed = grouped.map {
case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload))) case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_._1.payload)))
} }
val reduced: ProcessingResult = val reduced: ProcessingResult =
@ -71,8 +73,8 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per
persistenceId: String, persistenceId: String,
fromSequenceNr: Long, fromSequenceNr: Long,
toSequenceNr: Long, toSequenceNr: Long,
max: Long): immutable.Seq[PersistentRepr] = { max: Long): immutable.Seq[(PersistentRepr, Metadata)] = {
val batch = read(persistenceId, fromSequenceNr, toSequenceNr, max) val batch: immutable.Seq[(PersistentRepr, Metadata)] = read(persistenceId, fromSequenceNr, toSequenceNr, max)
currentPolicy.tryProcess(persistenceId, ReadEvents(batch)) match { currentPolicy.tryProcess(persistenceId, ReadEvents(batch)) match {
case ProcessingSuccess => batch case ProcessingSuccess => batch
case Reject(ex) => throw ex case Reject(ex) => throw ex
@ -96,9 +98,9 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per
} }
} }
private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[PersistentRepr] = { private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[(PersistentRepr, Metadata)] = {
val sn = getHighestSeqNumber(key) + 1 val sn = getHighestSeqNumber(key) + 1
elems.zipWithIndex.map(p => PersistentRepr(p._1, p._2 + sn, key)) elems.zipWithIndex.map(p => (PersistentRepr(p._1, p._2 + sn, key), NoMetadata))
} }
} }
@ -107,6 +109,24 @@ object EventStorage {
object JournalPolicies extends DefaultPolicies[JournalOperation] object JournalPolicies extends DefaultPolicies[JournalOperation]
/**
* INTERNAL API
*/
@InternalApi
private[testkit] sealed trait Metadata
/**
* INTERNAL API
*/
@InternalApi
private[testkit] case object NoMetadata extends Metadata
/**
* INTERNAL API
*/
@InternalApi
private[testkit] final case class WithMetadata(payload: Any) extends Metadata
} }
/** /**

View file

@ -7,13 +7,12 @@ package akka.persistence.testkit
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.Try import scala.util.Try
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence._ import akka.persistence._
import akka.persistence.journal.{ AsyncWriteJournal, Tagged } import akka.persistence.journal.{ AsyncWriteJournal, EventWithMetaData, Tagged }
import akka.persistence.snapshot.SnapshotStore import akka.persistence.snapshot.SnapshotStore
import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata }
import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension } import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension }
/** /**
@ -25,15 +24,25 @@ import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorag
class PersistenceTestKitPlugin extends AsyncWriteJournal { class PersistenceTestKitPlugin extends AsyncWriteJournal {
private final val storage = InMemStorageExtension(context.system) private final val storage = InMemStorageExtension(context.system)
private val eventStream = context.system.eventStream
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
Future.fromTry(Try(messages.map(aw => { Future.fromTry(Try(messages.map(aw => {
val data = aw.payload.map(pl => val data = aw.payload.map(pl =>
pl.payload match { pl.payload match {
case Tagged(p, _) => pl.withPayload(p) // TODO define how to handle tagged and metadata
case _ => pl case Tagged(p, _) => (pl.withPayload(p).withTimestamp(System.currentTimeMillis()), NoMetadata)
case evt: EventWithMetaData =>
(pl.withPayload(evt.event).withTimestamp(System.currentTimeMillis()), WithMetadata(evt.metaData))
case _ => (pl.withTimestamp(System.currentTimeMillis()), NoMetadata)
}) })
storage.tryAdd(data)
val result: Try[Unit] = storage.tryAdd(data)
result.foreach { _ =>
messages.foreach(aw =>
eventStream.publish(PersistenceTestKitPlugin.Write(aw.persistenceId, aw.highestSequenceNr)))
}
result
}))) })))
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
@ -41,7 +50,8 @@ class PersistenceTestKitPlugin extends AsyncWriteJournal {
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
recoveryCallback: PersistentRepr => Unit): Future[Unit] = recoveryCallback: PersistentRepr => Unit): Future[Unit] =
Future.fromTry(Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(recoveryCallback))) Future.fromTry(
Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).map(_._1).foreach(recoveryCallback)))
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
Future.fromTry(Try { Future.fromTry(Try {
@ -64,6 +74,8 @@ object PersistenceTestKitPlugin {
"akka.persistence.journal.plugin" -> PluginId, "akka.persistence.journal.plugin" -> PluginId,
s"$PluginId.class" -> s"${classOf[PersistenceTestKitPlugin].getName}").asJava) s"$PluginId.class" -> s"${classOf[PersistenceTestKitPlugin].getName}").asJava)
private[testkit] case class Write(persistenceId: String, toSequenceNr: Long)
} }
/** /**

View file

@ -4,40 +4,56 @@
package akka.persistence.testkit.internal package akka.persistence.testkit.internal
import scala.util.Try
import akka.actor.{ ActorSystem, ExtendedActorSystem } import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.PersistentRepr import akka.persistence.PersistentRepr
import akka.persistence.testkit.EventStorage import akka.persistence.testkit.EventStorage
import akka.serialization.{ Serialization, SerializationExtension } import akka.persistence.testkit.EventStorage.Metadata
import akka.persistence.testkit.internal.SerializedEventStorageImpl.Serialized
import akka.serialization.{ Serialization, SerializationExtension, Serializers }
@InternalApi
private[testkit] object SerializedEventStorageImpl {
case class Serialized(
persistenceId: String,
sequenceNr: Long,
payloadSerId: Int,
payloadSerManifest: String,
writerUuid: String,
payload: Array[Byte],
metadata: Metadata)
}
/** /**
* INTERNAL API * INTERNAL API
* FIXME, once we add serializers for metadata serialize the metadata payload if present
*/ */
@InternalApi @InternalApi
private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends EventStorage { private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends EventStorage {
override type InternalRepr = Serialized
override type InternalRepr = (Int, Array[Byte])
private lazy val serialization = SerializationExtension(system) private lazy val serialization = SerializationExtension(system)
/** /**
* @return (serializer id, serialized bytes) * @return (serializer id, serialized bytes)
*/ */
override def toInternal(repr: PersistentRepr): (Int, Array[Byte]) = override def toInternal(repr: (PersistentRepr, Metadata)): Serialized =
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
val s = serialization.findSerializerFor(repr) val (pr, meta) = repr
(s.identifier, s.toBinary(repr)) val payload = pr.payload.asInstanceOf[AnyRef]
val s = serialization.findSerializerFor(payload)
val manifest = Serializers.manifestFor(s, payload)
Serialized(pr.persistenceId, pr.sequenceNr, s.identifier, manifest, pr.writerUuid, s.toBinary(payload), meta)
} }
/** /**
* @param internal (serializer id, serialized bytes) * @param internal (serializer id, serialized bytes)
*/ */
override def toRepr(internal: (Int, Array[Byte])): PersistentRepr = override def toRepr(internal: Serialized): (PersistentRepr, Metadata) = {
serialization val event = serialization.deserialize(internal.payload, internal.payloadSerId, internal.payloadSerManifest).get
.deserialize(internal._2, internal._1, PersistentRepr.Undefined) (
.flatMap(r => Try(r.asInstanceOf[PersistentRepr])) PersistentRepr(event, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid),
.get internal.metadata)
}
} }

View file

@ -7,6 +7,7 @@ package akka.persistence.testkit.internal
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence._ import akka.persistence._
import akka.persistence.testkit.EventStorage import akka.persistence.testkit.EventStorage
import akka.persistence.testkit.EventStorage.Metadata
/** /**
* INTERNAL API * INTERNAL API
@ -14,10 +15,10 @@ import akka.persistence.testkit.EventStorage
@InternalApi @InternalApi
private[testkit] class SimpleEventStorageImpl extends EventStorage { private[testkit] class SimpleEventStorageImpl extends EventStorage {
override type InternalRepr = PersistentRepr override type InternalRepr = (PersistentRepr, Metadata)
override def toInternal(repr: PersistentRepr): PersistentRepr = repr override def toInternal(repr: (PersistentRepr, Metadata)): (PersistentRepr, Metadata) = repr
override def toRepr(internal: PersistentRepr): PersistentRepr = internal override def toRepr(internal: (PersistentRepr, Metadata)): (PersistentRepr, Metadata) = internal
} }

View file

@ -0,0 +1,16 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.query
import akka.actor.ExtendedActorSystem
import akka.persistence.query.ReadJournalProvider
class PersistenceTestKitReadJournalProvider(system: ExtendedActorSystem) extends ReadJournalProvider {
override def scaladslReadJournal(): scaladsl.PersistenceTestKitReadJournal =
new scaladsl.PersistenceTestKitReadJournal(system)
override def javadslReadJournal(): javadsl.PersistenceTestKitReadJournal =
new javadsl.PersistenceTestKitReadJournal(scaladslReadJournal())
}

View file

@ -0,0 +1,87 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.query.internal
import akka.actor.ActorRef
import akka.annotation.InternalApi
import akka.persistence.query.{ EventEnvelope, Sequence }
import akka.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin }
import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata }
import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageLogicWithLogging, OutHandler }
/**
* INTERNAL API
*/
@InternalApi
final private[akka] class EventsByPersistenceIdStage(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
storage: EventStorage)
extends GraphStage[SourceShape[EventEnvelope]] {
val out: Outlet[EventEnvelope] = Outlet("EventsByPersistenceIdSource")
override def shape: SourceShape[EventEnvelope] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogicWithLogging(shape) with OutHandler {
private var currentSequenceNr = math.max(fromSequenceNr, 1)
private var stageActorRef: ActorRef = null
override def preStart(): Unit = {
stageActorRef = getStageActor(receiveNotifications).ref
materializer.system.eventStream.subscribe(stageActorRef, classOf[PersistenceTestKitPlugin.Write])
}
private def receiveNotifications(in: (ActorRef, Any)): Unit = {
val (_, msg) = in
msg match {
case PersistenceTestKitPlugin.Write(pid, toSequenceNr) if pid == persistenceId =>
log.debug("Write notification {} {}", pid, toSequenceNr)
if (toSequenceNr >= currentSequenceNr) {
tryPush()
}
case _ =>
}
}
private def tryPush(): Unit = {
if (isAvailable(out)) {
val event = storage.tryRead(persistenceId, currentSequenceNr, currentSequenceNr, 1)
log.debug("tryPush available. Query for {} {} result {}", currentSequenceNr, currentSequenceNr, event)
event.headOption match {
case Some((pr, meta)) =>
push(
out,
EventEnvelope(
Sequence(pr.sequenceNr),
pr.persistenceId,
pr.sequenceNr,
pr.payload,
pr.timestamp,
meta match {
case NoMetadata => None
case WithMetadata(m) => Some(m)
}))
if (currentSequenceNr == toSequenceNr) {
completeStage()
} else {
currentSequenceNr += 1
}
case None =>
}
} else {
log.debug("tryPush, no demand")
}
}
override def onPull(): Unit = {
tryPush()
}
setHandler(out, this)
}
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.query.javadsl
import akka.NotUsed
import akka.persistence.query.EventEnvelope
import akka.persistence.query.javadsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal }
import akka.stream.javadsl.Source
import akka.persistence.testkit.query.scaladsl
object PersistenceTestKitReadJournal {
val Identifier = "akka.persistence.testkit.query"
}
final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitReadJournal)
extends ReadJournal
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery {
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
delegate.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
delegate.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava
}

View file

@ -0,0 +1,46 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.query.scaladsl
import akka.NotUsed
import akka.actor.ExtendedActorSystem
import akka.persistence.query.{ EventEnvelope, Sequence }
import akka.persistence.query.scaladsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal }
import akka.persistence.testkit.EventStorage
import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata }
import akka.persistence.testkit.internal.InMemStorageExtension
import akka.persistence.testkit.query.internal.EventsByPersistenceIdStage
import akka.stream.scaladsl.Source
object PersistenceTestKitReadJournal {
val Identifier = "akka.persistence.testkit.query"
}
final class PersistenceTestKitReadJournal(system: ExtendedActorSystem)
extends ReadJournal
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery {
private final val storage: EventStorage = InMemStorageExtension(system)
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
Source.fromGraph(new EventsByPersistenceIdStage(persistenceId, fromSequenceNr, toSequenceNr, storage))
}
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map {
case (pr, meta) =>
EventEnvelope(Sequence(pr.sequenceNr), persistenceId, pr.sequenceNr, pr.payload, pr.timestamp, meta match {
case NoMetadata => None
case WithMetadata(payload) => Some(payload)
})
}
}
}

View file

@ -7,9 +7,7 @@ package akka.persistence.testkit.scaladsl
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.util.Try import scala.util.Try
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
@ -20,6 +18,7 @@ import akka.annotation.ApiMayChange
import akka.persistence.Persistence import akka.persistence.Persistence
import akka.persistence.PersistentRepr import akka.persistence.PersistentRepr
import akka.persistence.SnapshotMetadata import akka.persistence.SnapshotMetadata
import akka.persistence.testkit.EventStorage.Metadata
import akka.persistence.testkit._ import akka.persistence.testkit._
import akka.persistence.testkit.internal.InMemStorageExtension import akka.persistence.testkit.internal.InMemStorageExtension
import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension
@ -424,9 +423,9 @@ object SnapshotTestKit {
*/ */
@ApiMayChange @ApiMayChange
class PersistenceTestKit(system: ActorSystem) class PersistenceTestKit(system: ActorSystem)
extends PersistenceTestKitOps[PersistentRepr, JournalOperation] extends PersistenceTestKitOps[(PersistentRepr, Metadata), JournalOperation]
with ExpectOps[PersistentRepr] with ExpectOps[(PersistentRepr, Metadata)]
with HasStorage[JournalOperation, PersistentRepr] { with HasStorage[JournalOperation, (PersistentRepr, Metadata)] {
require( require(
Try(Persistence(system).journalFor(PersistenceTestKitPlugin.PluginId)).isSuccess, Try(Persistence(system).journalFor(PersistenceTestKitPlugin.PluginId)).isSuccess,
"The test persistence plugin is not configured.") "The test persistence plugin is not configured.")
@ -495,7 +494,7 @@ class PersistenceTestKit(system: ActorSystem)
def persistedInStorage(persistenceId: String): immutable.Seq[Any] = def persistedInStorage(persistenceId: String): immutable.Seq[Any] =
storage.read(persistenceId).getOrElse(List.empty).map(reprToAny) storage.read(persistenceId).getOrElse(List.empty).map(reprToAny)
override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload override private[testkit] def reprToAny(repr: (PersistentRepr, Metadata)): Any = repr._1.payload
} }
@ApiMayChange @ApiMayChange

View file

@ -0,0 +1,169 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.query
import akka.{ Done, NotUsed }
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.ActorRef
import akka.persistence.journal.EventWithMetaData
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.{ EventAdapter, EventSeq, PersistenceId }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.duration._
object EventsByPersistenceIdSpec {
val config = PersistenceTestKitPlugin.config.withFallback(
ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.persistence.testkit.events.serialize = off
"""))
case class Command(evt: String, ack: ActorRef[Done])
case class State()
def testBehaviour(persistenceId: String) = {
EventSourcedBehavior[Command, String, State](
PersistenceId.ofUniqueId(persistenceId),
State(),
(_, command) =>
Effect.persist(command.evt).thenRun { _ =>
command.ack ! Done
},
(state, _) => state)
}.eventAdapter(new EventAdapter[String, Any] {
override def toJournal(e: String): Any = {
if (e.startsWith("m")) {
EventWithMetaData(e, s"$e-meta")
} else {
e
}
}
override def manifest(event: String): String = ""
override def fromJournal(p: Any, manifest: String): EventSeq[String] = p match {
case e: EventWithMetaData => EventSeq.single(e.event.toString)
case _ => EventSeq.single(p.toString)
}
})
}
class EventsByPersistenceIdSpec
extends ScalaTestWithActorTestKit(EventsByPersistenceIdSpec.config)
with LogCapturing
with AnyWordSpecLike {
import EventsByPersistenceIdSpec._
implicit val classic = system.classicSystem
val queries =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
def setup(persistenceId: String): ActorRef[Command] = {
val probe = createTestProbe[Done]()
val ref = setupEmpty(persistenceId)
ref ! Command(s"$persistenceId-1", probe.ref)
ref ! Command(s"$persistenceId-2", probe.ref)
ref ! Command(s"$persistenceId-3", probe.ref)
probe.expectMessage(Done)
probe.expectMessage(Done)
probe.expectMessage(Done)
ref
}
def setupEmpty(persistenceId: String): ActorRef[Command] = {
spawn(testBehaviour(persistenceId))
}
"Persistent test kit live query EventsByPersistenceId" must {
"find new events" in {
val ackProbe = createTestProbe[Done]()
val ref = setup("c")
val src = queries.eventsByPersistenceId("c", 0L, Long.MaxValue)
val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("c-1", "c-2", "c-3")
ref ! Command("c-4", ackProbe.ref)
ackProbe.expectMessage(Done)
probe.expectNext("c-4")
}
"find new events up to a sequence number" in {
val ackProbe = createTestProbe[Done]()
val ref = setup("d")
val src = queries.eventsByPersistenceId("d", 0L, 4L)
val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("d-1", "d-2", "d-3")
ref ! Command("d-4", ackProbe.ref)
ackProbe.expectMessage(Done)
probe.expectNext("d-4").expectComplete()
}
"find new events after demand request" in {
val ackProbe = createTestProbe[Done]()
val ref = setup("e")
val src = queries.eventsByPersistenceId("e", 0L, Long.MaxValue)
val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("e-1", "e-2").expectNoMessage(100.millis)
ref ! Command("e-4", ackProbe.ref)
ackProbe.expectMessage(Done)
probe.expectNoMessage(100.millis).request(5).expectNext("e-3").expectNext("e-4")
}
"include timestamp in EventEnvelope" in {
setup("n")
val src = queries.eventsByPersistenceId("n", 0L, Long.MaxValue)
val probe = src.runWith(TestSink.probe[EventEnvelope])
probe.request(5)
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.cancel()
}
"not complete for empty persistence id" in {
val ackProbe = createTestProbe[Done]()
val src = queries.eventsByPersistenceId("o", 0L, Long.MaxValue)
val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2)
probe.expectNoMessage(200.millis) // must not complete
val ref = setupEmpty("o")
ref ! Command("o-1", ackProbe.ref)
ackProbe.expectMessage(Done)
probe.cancel()
}
"return metadata in queries" in {
val ackProbe = createTestProbe[Done]()
val ref = setupEmpty("with-meta")
ref ! Command("m-1", ackProbe.ref)
ref ! Command("m-2", ackProbe.ref)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("with-meta", 0L, Long.MaxValue)
val probe =
src.runWith(TestSink.probe[Any]).request(3)
probe.expectNextPF {
case e @ EventEnvelope(_, "with-meta", 1L, "m-1") if e.eventMetadata.contains("m-1-meta") =>
}
probe.expectNextPF {
case e @ EventEnvelope(_, "with-meta", 2L, "m-2") if e.eventMetadata.contains("m-2-meta") =>
}
}
}
}

View file

@ -234,7 +234,7 @@ class EventSourcedBehaviorTestKitSpec
val eventSourcedTestKit = createTestKit() val eventSourcedTestKit = createTestKit()
val exc = intercept[IllegalArgumentException] { val exc = intercept[IllegalArgumentException] {
eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableReply(_)) eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableReply)
} }
(exc.getMessage should include).regex("Reply.*isn't serializable") (exc.getMessage should include).regex("Reply.*isn't serializable")
exc.getCause.getClass should ===(classOf[NotSerializableException]) exc.getCause.getClass should ===(classOf[NotSerializableException])

View file

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- Silence initial setup logging from Logback -->
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistenceId}] - %msg %n</pattern>
</encoder>
</appender>
<!--
Logging from tests are silenced by this appender. When there is a test failure
the captured logging events are flushed to the appenders defined for the
akka.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
-->
<appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
<!--
The appenders defined for this CapturingAppenderDelegate logger are used
when there is a test failure and all logging events from the test are
flushed to these appenders.
-->
<logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
<appender-ref ref="STDOUT"/>
</logger>
<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
</root>
</configuration>

View file

@ -2,30 +2,23 @@
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com> * Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/ */
package docs.akka.persistence.typed.aa package docs.akka.persistence.typed
import java.time.Instant import java.time.Instant
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, _ } import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, _ }
import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior } import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import com.typesafe.config.ConfigFactory import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.{ Eventually, ScalaFutures } import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.wordspec.AnyWordSpecLike
object AAAuctionExampleSpec { object AAAuctionExampleSpec {
val config = ConfigFactory.parseString("""
akka.actor.provider = cluster
akka.loglevel = info
akka.persistence {
journal {
plugin = "akka.persistence.journal.inmem"
}
}
""")
type MoneyAmount = Int type MoneyAmount = Int
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String) case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String)
@ -38,7 +31,7 @@ object AAAuctionExampleSpec {
final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand
private final case object Close extends AuctionCommand // Internal, should not be sent from the outside private final case object Close extends AuctionCommand // Internal, should not be sent from the outside
sealed trait AuctionEvent sealed trait AuctionEvent extends CborSerializable
final case class BidRegistered(bid: Bid) extends AuctionEvent final case class BidRegistered(bid: Bid) extends AuctionEvent
final case class AuctionFinished(atDc: String) extends AuctionEvent final case class AuctionFinished(atDc: String) extends AuctionEvent
final case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent final case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent
@ -208,7 +201,7 @@ object AAAuctionExampleSpec {
def behavior(replica: String, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] { def behavior(replica: String, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
ctx => ctx =>
ActiveActiveEventSourcing(setup.name, replica, setup.allDcs, LeveldbReadJournal.Identifier) { aaCtx => ActiveActiveEventSourcing(setup.name, replica, setup.allDcs, PersistenceTestKitReadJournal.Identifier) { aaCtx =>
EventSourcedBehavior( EventSourcedBehavior(
aaCtx.persistenceId, aaCtx.persistenceId,
initialState(setup), initialState(setup),
@ -219,7 +212,7 @@ object AAAuctionExampleSpec {
} }
class AAAuctionExampleSpec class AAAuctionExampleSpec
extends ScalaTestWithActorTestKit(AAAuctionExampleSpec.config) extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike with AnyWordSpecLike
with Matchers with Matchers
with LogCapturing with LogCapturing
@ -228,6 +221,7 @@ class AAAuctionExampleSpec
import AAAuctionExampleSpec._ import AAAuctionExampleSpec._
"Auction example" should { "Auction example" should {
"work" in { "work" in {
val Replicas = Set("DC-A", "DC-B") val Replicas = Set("DC-A", "DC-B")
val setupA = val setupA =

View file

@ -2,41 +2,22 @@
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com> * Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/ */
package docs.akka.persistence.typed.aa package docs.akka.persistence.typed
import java.util.UUID
import akka.Done import akka.Done
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl._ import akka.persistence.typed.scaladsl._
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.{ Eventually, ScalaFutures } import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{ Millis, Span } import org.scalatest.time.{ Millis, Span }
import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.wordspec.AnyWordSpecLike
object AABlogExampleSpec { object AABlogExampleSpec {
val config =
ConfigFactory.parseString(s"""
akka.actor.allow-java-serialization = true
// FIXME serializers for replicated event or akka persistence support for metadata: https://github.com/akka/akka/issues/29260
akka.actor.provider = cluster
akka.loglevel = debug
akka.persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
leveldb {
native = off
dir = "target/journal-AABlogExampleSpec-${UUID.randomUUID()}"
}
}
}
""")
final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) { final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) {
def withContent(newContent: PostContent, timestamp: LwwTime): BlogState = def withContent(newContent: PostContent, timestamp: LwwTime): BlogState =
@ -62,7 +43,7 @@ object AABlogExampleSpec {
} }
class AABlogExampleSpec class AABlogExampleSpec
extends ScalaTestWithActorTestKit(AABlogExampleSpec.config) extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike with AnyWordSpecLike
with Matchers with Matchers
with LogCapturing with LogCapturing
@ -124,7 +105,7 @@ class AABlogExampleSpec
"work" in { "work" in {
val refDcA: ActorRef[BlogCommand] = val refDcA: ActorRef[BlogCommand] =
spawn(Behaviors.setup[BlogCommand] { ctx => spawn(Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing("cat", "DC-A", Set("DC-A", "DC-B"), LeveldbReadJournal.Identifier) { ActiveActiveEventSourcing("cat", "DC-A", Set("DC-A", "DC-B"), PersistenceTestKitReadJournal.Identifier) {
(aa: ActiveActiveContext) => (aa: ActiveActiveContext) =>
behavior(aa, ctx) behavior(aa, ctx)
} }
@ -132,7 +113,7 @@ class AABlogExampleSpec
val refDcB: ActorRef[BlogCommand] = val refDcB: ActorRef[BlogCommand] =
spawn(Behaviors.setup[BlogCommand] { ctx => spawn(Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing("cat", "DC-B", Set("DC-A", "DC-B"), LeveldbReadJournal.Identifier) { ActiveActiveEventSourcing("cat", "DC-B", Set("DC-A", "DC-B"), PersistenceTestKitReadJournal.Identifier) {
(aa: ActiveActiveContext) => (aa: ActiveActiveContext) =>
behavior(aa, ctx) behavior(aa, ctx)
} }

View file

@ -0,0 +1,6 @@
# Changes to internal/private
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withActiveActive")
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.Running*")
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl.*")
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.BehaviorSetup*")

View file

@ -263,5 +263,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
extends InternalProtocol extends InternalProtocol
} }
final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long) // FIXME serializer
case object ReplicatedEventAck @InternalApi
private[akka] final case class ReplicatedEventMetaData(originDc: String)
@InternalApi
private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long)
@InternalApi
private[akka] case object ReplicatedEventAck

View file

@ -23,7 +23,7 @@ import akka.persistence.PersistentRepr
import akka.persistence.SaveSnapshotFailure import akka.persistence.SaveSnapshotFailure
import akka.persistence.SaveSnapshotSuccess import akka.persistence.SaveSnapshotSuccess
import akka.persistence.SnapshotProtocol import akka.persistence.SnapshotProtocol
import akka.persistence.journal.Tagged import akka.persistence.journal.{ EventWithMetaData, Tagged }
import akka.persistence.query.{ EventEnvelope, PersistenceQuery } import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
import akka.persistence.typed.{ import akka.persistence.typed.{
@ -121,7 +121,11 @@ private[akka] object Running {
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
.via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { .via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) {
(eventEnvelope, replyTo) => (eventEnvelope, replyTo) =>
ReplicatedEventEnvelope(eventEnvelope.event.asInstanceOf[ReplicatedEvent[E]], replyTo) val re = ReplicatedEvent[E](
eventEnvelope.event.asInstanceOf[E],
eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originDc,
eventEnvelope.sequenceNr) // FIXME, this is the wrong sequence nr, we need origin sequence nr, follow up with tests that show this
ReplicatedEventEnvelope(re, replyTo)
}) })
} }
@ -206,8 +210,9 @@ private[akka] object Running {
private def handleReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { private def handleReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
_currentSequenceNumber = state.seqNr + 1 _currentSequenceNumber = state.seqNr + 1
val replicatedEvent = new EventWithMetaData(event.event, ReplicatedEventMetaData(event.originReplica))
val newState: RunningState[S] = state.applyEvent(setup, event.event) val newState: RunningState[S] = state.applyEvent(setup, event.event)
val newState2: RunningState[S] = internalPersist(setup.context, null, newState, event, "") val newState2: RunningState[S] = internalPersist(setup.context, null, newState, replicatedEvent, "")
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
// FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259
val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr)
@ -241,10 +246,11 @@ private[akka] object Running {
// the invalid event, in case such validation is implemented in the event handler. // the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event // also, ensure that there is an event handler for each single event
_currentSequenceNumber = state.seqNr + 1 _currentSequenceNumber = state.seqNr + 1
val newState = state.applyEvent(setup, event) val newState: RunningState[S] = state.applyEvent(setup, event)
val eventToPersist = adaptEvent(event) val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event) val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = setup.activeActive match { val newState2 = setup.activeActive match {
case Some(aa) => case Some(aa) =>
val replicatedEvent = ReplicatedEvent(eventToPersist, aa.replicaId, _currentSequenceNumber) val replicatedEvent = ReplicatedEvent(eventToPersist, aa.replicaId, _currentSequenceNumber)

View file

@ -0,0 +1,4 @@
# Changes to internal/private
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.journal.inmem.InmemMessages.*")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.journal.inmem.InmemJournal.*")

View file

@ -0,0 +1,33 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.journal
object EventWithMetaData {
def apply(event: Any, metaData: Any): EventWithMetaData = new EventWithMetaData(event, metaData)
/**
* If meta data could not be deserialized it will not fail the replay/query.
* The "invalid" meta data is represented with this `UnknownMetaData` and
* it and the event will be wrapped in `EventWithMetaData`.
*
* The reason for not failing the replay/query is that meta data should be
* optional, e.g. the tool that wrote the meta data has been removed. This
* is typically because the serializer for the meta data has been removed
* from the class path (or configuration).
*/
final class UnknownMetaData(val serializerId: Int, val manifest: String)
}
/**
* If the event is wrapped in this class the `metaData` will
* be serialized and stored separately from the event payload. This can be used by event
* adapters or other tools to store additional meta data without altering
* the actual domain event.
*
* Check the documentation of the persistence plugin in use to use if it supports
* EventWithMetaData.
*/
final class EventWithMetaData(val event: Any, val metaData: Any)

View file

@ -4,21 +4,25 @@
package akka.persistence.journal.inmem package akka.persistence.journal.inmem
import akka.actor.ActorRef
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.Try import scala.util.Try
import scala.util.control.NonFatal import scala.util.control.NonFatal
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.annotation.ApiMayChange import akka.annotation.ApiMayChange
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.AtomicWrite import akka.persistence.AtomicWrite
import akka.persistence.JournalProtocol.RecoverySuccess
import akka.persistence.PersistentRepr import akka.persistence.PersistentRepr
import akka.persistence.journal.{ AsyncWriteJournal, Tagged } import akka.persistence.journal.inmem.InmemJournal.{ MessageWithMeta, ReplayWithMeta }
import akka.persistence.journal.{ AsyncWriteJournal, EventWithMetaData, Tagged }
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.serialization.Serializers import akka.serialization.Serializers
import akka.util.OptionVal
/** /**
* The InmemJournal publishes writes and deletes to the `eventStream`, which tests may use to * The InmemJournal publishes writes and deletes to the `eventStream`, which tests may use to
@ -32,8 +36,17 @@ object InmemJournal {
sealed trait Operation sealed trait Operation
final case class Write(event: Any, persistenceId: String, sequenceNr: Long) extends Operation final case class Write(event: Any, persistenceId: String, sequenceNr: Long) extends Operation
final case class Delete(persistenceId: String, toSequenceNr: Long) extends Operation final case class Delete(persistenceId: String, toSequenceNr: Long) extends Operation
@InternalApi
private[persistence] case class ReplayWithMeta(
from: Long,
to: Long,
limit: Long,
persistenceId: String,
replyTo: ActorRef)
@InternalApi
private[persistence] case class MessageWithMeta(pr: PersistentRepr, meta: OptionVal[Any])
} }
/** /**
@ -45,6 +58,8 @@ object InmemJournal {
def this() = this(ConfigFactory.empty()) def this() = this(ConfigFactory.empty())
private val log = Logging(context.system, classOf[InmemJournal])
private val testSerialization = { private val testSerialization = {
val key = "test-serialization" val key = "test-serialization"
if (cfg.hasPath(key)) cfg.getBoolean("test-serialization") if (cfg.hasPath(key)) cfg.getBoolean("test-serialization")
@ -78,7 +93,9 @@ object InmemJournal {
recoveryCallback: PersistentRepr => Unit): Future[Unit] = { recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
val highest = highestSequenceNr(persistenceId) val highest = highestSequenceNr(persistenceId)
if (highest != 0L && max != 0L) if (highest != 0L && max != 0L)
read(persistenceId, fromSequenceNr, math.min(toSequenceNr, highest), max).foreach(recoveryCallback) read(persistenceId, fromSequenceNr, math.min(toSequenceNr, highest), max).foreach {
case (pr, _) => recoveryCallback(pr)
}
Future.successful(()) Future.successful(())
} }
@ -93,6 +110,19 @@ object InmemJournal {
Future.successful(()) Future.successful(())
} }
override def receivePluginInternal: Receive = {
case ReplayWithMeta(fromSequenceNr, toSequenceNr, max, persistenceId, replyTo) =>
log.debug("ReplayWithMeta {} {} {} {}", fromSequenceNr, toSequenceNr, max, persistenceId)
val highest = highestSequenceNr(persistenceId)
if (highest != 0L && max != 0L) {
read(persistenceId, fromSequenceNr, math.min(toSequenceNr, highest), max).foreach {
case (pr, meta) => replyTo ! MessageWithMeta(pr, meta)
}
}
replyTo ! RecoverySuccess(highest)
}
private def verifySerialization(event: Any): Unit = { private def verifySerialization(event: Any): Unit = {
if (testSerialization) { if (testSerialization) {
val eventAnyRef = event.asInstanceOf[AnyRef] val eventAnyRef = event.asInstanceOf[AnyRef]
@ -109,31 +139,35 @@ object InmemJournal {
*/ */
@InternalApi private[persistence] trait InmemMessages { @InternalApi private[persistence] trait InmemMessages {
// persistenceId -> persistent message // persistenceId -> persistent message
var messages = Map.empty[String, Vector[PersistentRepr]] var messages = Map.empty[String, Vector[(PersistentRepr, OptionVal[Any])]]
// persistenceId -> highest used sequence number // persistenceId -> highest used sequence number
private var highestSequenceNumbers = Map.empty[String, Long] private var highestSequenceNumbers = Map.empty[String, Long]
// FIXME, which way around should Tagged/EventWithMeta go? https://github.com/akka/akka/issues/29284
def add(p: PersistentRepr): Unit = { def add(p: PersistentRepr): Unit = {
val pr = p.payload match { val pr = p.payload match {
case Tagged(payload, _) => p.withPayload(payload) case Tagged(payload, _) => (p.withPayload(payload).withTimestamp(System.currentTimeMillis()), OptionVal.None)
case _ => p case meta: EventWithMetaData =>
(p.withPayload(meta.event).withTimestamp(System.currentTimeMillis()), OptionVal.Some(meta.metaData))
case _ => (p.withTimestamp(System.currentTimeMillis()), OptionVal.None)
} }
messages = messages + (messages.get(pr.persistenceId) match {
case Some(ms) => pr.persistenceId -> (ms :+ pr) messages = messages + (messages.get(p.persistenceId) match {
case None => pr.persistenceId -> Vector(pr) case Some(ms) => p.persistenceId -> (ms :+ pr)
case None => p.persistenceId -> Vector(pr)
}) })
highestSequenceNumbers = highestSequenceNumbers =
highestSequenceNumbers.updated(pr.persistenceId, math.max(highestSequenceNr(pr.persistenceId), pr.sequenceNr)) highestSequenceNumbers.updated(p.persistenceId, math.max(highestSequenceNr(p.persistenceId), p.sequenceNr))
} }
def delete(pid: String, snr: Long): Unit = messages = messages.get(pid) match { def delete(pid: String, snr: Long): Unit = messages = messages.get(pid) match {
case Some(ms) => messages + (pid -> ms.filterNot(_.sequenceNr == snr)) case Some(ms) => messages + (pid -> ms.filterNot(_._1.sequenceNr == snr))
case None => messages case None => messages
} }
def read(pid: String, fromSnr: Long, toSnr: Long, max: Long): immutable.Seq[PersistentRepr] = def read(pid: String, fromSnr: Long, toSnr: Long, max: Long): immutable.Seq[(PersistentRepr, OptionVal[Any])] =
messages.get(pid) match { messages.get(pid) match {
case Some(ms) => ms.filter(m => m.sequenceNr >= fromSnr && m.sequenceNr <= toSnr).take(safeLongToInt(max)) case Some(ms) => ms.filter(m => m._1.sequenceNr >= fromSnr && m._1.sequenceNr <= toSnr).take(safeLongToInt(max))
case None => Nil case None => Nil
} }

View file

@ -39,10 +39,10 @@ object EventSourcedActorFailureSpec {
val readFromStore = read(persistenceId, fromSequenceNr, toSequenceNr, max) val readFromStore = read(persistenceId, fromSequenceNr, toSequenceNr, max)
if (readFromStore.isEmpty) if (readFromStore.isEmpty)
Future.successful(()) Future.successful(())
else if (isCorrupt(readFromStore)) else if (isCorrupt(readFromStore.map(_._1)))
Future.failed(new SimulatedException(s"blahonga $fromSequenceNr $toSequenceNr")) Future.failed(new SimulatedException(s"blahonga $fromSequenceNr $toSequenceNr"))
else { else {
readFromStore.foreach(recoveryCallback) readFromStore.map(_._1).foreach(recoveryCallback)
Future.successful(()) Future.successful(())
} }
} }

View file

@ -66,11 +66,11 @@ class ChaosJournal extends AsyncWriteJournal {
replayCallback: (PersistentRepr) => Unit): Future[Unit] = replayCallback: (PersistentRepr) => Unit): Future[Unit] =
if (shouldFail(replayFailureRate)) { if (shouldFail(replayFailureRate)) {
val rm = read(persistenceId, fromSequenceNr, toSequenceNr, max) val rm = read(persistenceId, fromSequenceNr, toSequenceNr, max)
val sm = rm.take(random.nextInt(rm.length + 1)) val sm = rm.take(random.nextInt(rm.length + 1)).map(_._1)
sm.foreach(replayCallback) sm.foreach(replayCallback)
Future.failed(new ReplayFailedException(sm)) Future.failed(new ReplayFailedException(sm))
} else { } else {
read(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(replayCallback) read(persistenceId, fromSequenceNr, toSequenceNr, max).map(_._1).foreach(replayCallback)
Future.successful(()) Future.successful(())
} }

View file

@ -314,6 +314,13 @@ lazy val persistenceTestkit = akkaModule("akka-persistence-testkit")
.settings(AutomaticModuleName.settings("akka.persistence.testkit")) .settings(AutomaticModuleName.settings("akka.persistence.testkit"))
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
lazy val persistenceTypedTests = akkaModule("akka-persistence-typed-tests")
.dependsOn(persistenceTyped, persistenceTestkit % "test", actorTestkitTyped % "test", jackson % "test->test")
.settings(AkkaBuild.mayChangeSettings)
.settings(Dependencies.persistenceTypedTests)
.disablePlugins(MimaPlugin)
.enablePlugins(NoPublish)
lazy val protobuf = akkaModule("akka-protobuf") lazy val protobuf = akkaModule("akka-protobuf")
.settings(OSGi.protobuf) .settings(OSGi.protobuf)
.settings(AutomaticModuleName.settings("akka.protobuf")) .settings(AutomaticModuleName.settings("akka.protobuf"))

View file

@ -255,6 +255,8 @@ object Dependencies {
val persistenceTestKit = l ++= Seq(Test.scalatest, Test.logback) val persistenceTestKit = l ++= Seq(Test.scalatest, Test.logback)
val persistenceTypedTests = l ++= Seq(Test.scalatest, Test.logback)
val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.logback) val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.logback)
val jackson = l ++= Seq( val jackson = l ++= Seq(