Serializer for EventEnvelope and Offset (#30970)

This commit is contained in:
Patrik Nordwall 2021-12-09 13:30:59 +01:00 committed by GitHub
parent 3a7201183d
commit f946655ad5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 2425 additions and 2 deletions

View file

@ -0,0 +1,24 @@
/*
* Copyright (C) 2014-2021 Lightbend Inc. <https://www.lightbend.com>
*/
syntax = "proto2";
package akka.persistence.query;
option java_package = "akka.persistence.query.internal.protobuf";
option optimize_for = SPEED;
import "ContainerFormats.proto";
// for akka.persistence.query.typed.EventEnvelope
message EventEnvelope {
required string persistence_id = 1;
required string entity_type = 2;
required int32 slice = 3;
required int64 sequence_nr = 4;
required int64 timestamp = 5;
required string offset = 6;
required string offset_manifest = 7;
optional Payload event = 8;
optional Payload metadata = 9;
}

View file

@ -27,3 +27,16 @@ akka.persistence.query.journal.leveldb {
max-buffer-size = 100 max-buffer-size = 100
} }
#//#query-leveldb #//#query-leveldb
akka.actor {
serializers {
akka-persistence-query = "akka.persistence.query.internal.QuerySerializer"
}
serialization-bindings {
"akka.persistence.query.typed.EventEnvelope" = akka-persistence-query
"akka.persistence.query.Offset" = akka-persistence-query
}
serialization-identifiers {
"akka.persistence.query.internal.QuerySerializer" = 39
}
}

View file

@ -0,0 +1,222 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.internal
import java.io.NotSerializableException
import java.nio.charset.StandardCharsets.UTF_8
import java.time.Instant
import java.util.Base64
import java.util.UUID
import scala.util.control.NonFatal
import akka.annotation.InternalApi
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.Sequence
import akka.persistence.query.TimeBasedUUID
import akka.persistence.query.TimestampOffset
import akka.persistence.query.internal.protobuf.QueryMessages
import akka.persistence.query.typed.EventEnvelope
import akka.remote.serialization.WrappedPayloadSupport
import akka.serialization.BaseSerializer
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.serialization.Serializers
/**
* INTERNAL API
*
* Serializer for [[EventEnvelope]] and [[Offset]].
*/
@InternalApi private[akka] final class QuerySerializer(val system: akka.actor.ExtendedActorSystem)
extends SerializerWithStringManifest
with BaseSerializer {
private val payloadSupport = new WrappedPayloadSupport(system)
private lazy val serialization = SerializationExtension(system)
private final val EventEnvelopeManifest = "a"
private final val SequenceOffsetManifest = "SEQ"
private final val TimeBasedUUIDOffsetManifest = "TBU"
private final val TimestampOffsetManifest = "TSO"
private final val NoOffsetManifest = "NO"
private val manifestSeparator = ':'
// persistenceId and timestamp must not contain this separator char
private val timestampOffsetSeparator = ';'
override def manifest(o: AnyRef): String = o match {
case _: EventEnvelope[_] => EventEnvelopeManifest
case offset: Offset => toStorageRepresentation(offset)._2
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case env: EventEnvelope[_] =>
val builder = QueryMessages.EventEnvelope.newBuilder()
val (offset, offsetManifest) = toStorageRepresentation(env.offset)
builder
.setPersistenceId(env.persistenceId)
.setEntityType(env.entityType)
.setSlice(env.slice)
.setSequenceNr(env.sequenceNr)
.setTimestamp(env.timestamp)
.setOffset(offset)
.setOffsetManifest(offsetManifest)
env.eventOption.foreach(event => builder.setEvent(payloadSupport.payloadBuilder(event)))
env.eventMetadata.foreach(meta => builder.setMetadata(payloadSupport.payloadBuilder(meta)))
builder.build().toByteArray()
case offset: Offset =>
toStorageRepresentation(offset)._1.getBytes(UTF_8)
case _ =>
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case EventEnvelopeManifest =>
val env = QueryMessages.EventEnvelope.parseFrom(bytes)
val offset = fromStorageRepresentation(env.getOffset, env.getOffsetManifest)
val eventOption =
if (env.hasEvent) Option(payloadSupport.deserializePayload(env.getEvent))
else None
val metaOption =
if (env.hasMetadata) Option(payloadSupport.deserializePayload(env.getMetadata))
else None
new EventEnvelope(
offset,
env.getPersistenceId,
env.getSequenceNr,
eventOption,
env.getTimestamp,
metaOption,
env.getEntityType,
env.getSlice)
case _ =>
fromStorageRepresentation(new String(bytes, UTF_8), manifest)
}
/**
* Deserialize an offset from a stored string representation and manifest.
* The offset is converted from its string representation to its real type.
*/
private def fromStorageRepresentation(offsetStr: String, manifest: String): Offset = {
manifest match {
case TimestampOffsetManifest => timestampOffsetFromStorageRepresentation(offsetStr)
case SequenceOffsetManifest => Offset.sequence(offsetStr.toLong)
case TimeBasedUUIDOffsetManifest => Offset.timeBasedUUID(UUID.fromString(offsetStr))
case NoOffsetManifest => NoOffset
case _ =>
manifest.split(manifestSeparator) match {
case Array(serializerIdStr, serializerManifest) =>
val serializerId = serializerIdStr.toInt
val bytes = Base64.getDecoder.decode(offsetStr)
serialization.deserialize(bytes, serializerId, serializerManifest).get match {
case offset: Offset => offset
case other =>
throw new NotSerializableException(
s"Unimplemented deserialization of offset with serializerId [$serializerId] and manifest [$manifest] " +
s"in [${getClass.getName}]. [${other.getClass.getName}] is not an Offset.")
}
case _ =>
throw new NotSerializableException(
s"Unimplemented deserialization of offset with manifest [$manifest] " +
s"in [${getClass.getName}]. [$manifest] doesn't contain two parts.")
}
}
}
/**
* Convert the offset to a tuple (String, String) where the first element is
* the String representation of the offset and the second element is its manifest.
*/
private def toStorageRepresentation(offset: Offset): (String, String) = {
offset match {
case t: TimestampOffset => (timestampOffsetToStorageRepresentation(t), TimestampOffsetManifest)
case seq: Sequence => (seq.value.toString, SequenceOffsetManifest)
case tbu: TimeBasedUUID => (tbu.value.toString, TimeBasedUUIDOffsetManifest)
case NoOffset => ("", NoOffsetManifest)
case _ =>
val obj = offset.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(obj)
val serializerId = serializer.identifier
val serializerManifest = Serializers.manifestFor(serializer, obj)
val bytes = serializer.toBinary(obj)
val offsetStr = Base64.getEncoder.encodeToString(bytes)
if (serializerManifest.contains(manifestSeparator))
throw new IllegalArgumentException(
s"Serializer manifest [$serializerManifest] for " +
s"offset [${offset.getClass.getName}] must not contain [$manifestSeparator] character.")
(offsetStr, s"$serializerId$manifestSeparator$serializerManifest")
}
}
private def timestampOffsetFromStorageRepresentation(str: String): TimestampOffset = {
try {
str.split(timestampOffsetSeparator) match {
case Array(timestamp, readTimestamp, pid, seqNr) =>
// optimized for the normal case
TimestampOffset(Instant.parse(timestamp), Instant.parse(readTimestamp), Map(pid -> seqNr.toLong))
case Array(timestamp) =>
TimestampOffset(Instant.parse(timestamp), Map.empty)
case Array(timestamp, readTimestamp) =>
TimestampOffset(Instant.parse(timestamp), Instant.parse(readTimestamp), Map.empty)
case parts =>
val seen = parts.toList
.drop(2)
.grouped(2)
.map {
case pid :: seqNr :: Nil => pid -> seqNr.toLong
case _ =>
throw new IllegalArgumentException(
s"Invalid representation of Map(pid -> seqNr) [${parts.toList.drop(1).mkString(",")}]")
}
.toMap
TimestampOffset(Instant.parse(parts(0)), Instant.parse(parts(1)), seen)
}
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Unexpected serialized TimestampOffset format [$str].", e)
}
}
private def timestampOffsetToStorageRepresentation(offset: TimestampOffset): String = {
def checkSeparator(pid: String): Unit =
if (pid.contains(timestampOffsetSeparator))
throw new IllegalArgumentException(
s"persistenceId [$pid] in offset [$offset] " +
s"must not contain [$timestampOffsetSeparator] character")
val str = new java.lang.StringBuilder
str.append(offset.timestamp).append(timestampOffsetSeparator).append(offset.readTimestamp)
if (offset.seen.size == 1) {
// optimized for the normal case
val pid = offset.seen.head._1
checkSeparator(pid)
val seqNr = offset.seen.head._2
str.append(timestampOffsetSeparator).append(pid).append(timestampOffsetSeparator).append(seqNr)
} else if (offset.seen.nonEmpty) {
offset.seen.toList.sortBy(_._1).foreach {
case (pid, seqNr) =>
checkSeparator(pid)
str.append(timestampOffsetSeparator).append(pid).append(timestampOffsetSeparator).append(seqNr)
}
}
str.toString
}
}

View file

@ -112,5 +112,5 @@ final class EventEnvelope[Event](
} }
override def toString: String = override def toString: String =
s"EventBySliceEnvelope($offset,$persistenceId,$sequenceNr,$eventOption,$timestamp,$eventMetadata,$entityType,$slice)" s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventOption,$timestamp,$eventMetadata,$entityType,$slice)"
} }

View file

@ -0,0 +1,93 @@
/*
* Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.internal
import java.time.Instant
import java.util.UUID
import akka.persistence.query.NoOffset
import akka.persistence.query.Sequence
import akka.persistence.query.TimeBasedUUID
import akka.persistence.query.TimestampOffset
import akka.persistence.query.typed.EventEnvelope
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.testkit.AkkaSpec
class QuerySerializerSpec extends AkkaSpec {
private val serialization = SerializationExtension(system)
def verifySerialization(obj: AnyRef): Unit = {
val serializer = serialization.findSerializerFor(obj).asInstanceOf[SerializerWithStringManifest]
val manifest = serializer.manifest(obj)
val bytes = serialization.serialize(obj).get
val deserialzied = serialization.deserialize(bytes, serializer.identifier, manifest).get
deserialzied shouldBe obj
}
"Query serializer" should {
"serialize EventEnvelope with Sequence Offset" in {
verifySerialization(
EventEnvelope(Sequence(1L), "TestEntity|id1", 3L, "event1", System.currentTimeMillis(), "TestEntity", 5))
}
"serialize EventEnvelope with Meta" in {
verifySerialization(
new EventEnvelope(
Sequence(1L),
"TestEntity|id1",
3L,
Some("event1"),
System.currentTimeMillis(),
Some("some-meta"),
"TestEntity",
5))
}
"serialize EventEnvelope with Timestamp Offset" in {
verifySerialization(
EventEnvelope(
TimestampOffset(Instant.now(), Instant.now(), Map("pid1" -> 3)),
"TestEntity|id1",
3L,
"event1",
System.currentTimeMillis(),
"TestEntity",
5))
}
"serialize EventEnvelope with TimeBasedUUID Offset" in {
//2019-12-16T15:32:36.148Z[UTC]
val uuidString = "49225740-2019-11ea-a752-ffae2393b6e4"
val timeUuidOffset = TimeBasedUUID(UUID.fromString(uuidString))
verifySerialization(
EventEnvelope(timeUuidOffset, "TestEntity|id1", 3L, "event1", System.currentTimeMillis(), "TestEntity", 5))
}
"serialize Sequence Offset" in {
verifySerialization(Sequence(0))
}
"serialize Timestamp Offset" in {
verifySerialization(TimestampOffset(Instant.now(), Instant.now(), Map("pid1" -> 3)))
verifySerialization(TimestampOffset(Instant.now(), Instant.now(), Map("pid1" -> 3, "pid2" -> 4)))
verifySerialization(TimestampOffset(Instant.now(), Instant.now(), Map.empty))
verifySerialization(TimestampOffset(Instant.now(), Map.empty))
}
"serialize TimeBasedUUID Offset" in {
//2019-12-16T15:32:36.148Z[UTC]
val uuidString = "49225740-2019-11ea-a752-ffae2393b6e4"
val timeUuidOffset = TimeBasedUUID(UUID.fromString(uuidString))
verifySerialization(timeUuidOffset)
}
"serialize NoOffset" in {
verifySerialization(NoOffset)
}
}
}

View file

@ -291,10 +291,18 @@ lazy val persistence = akkaModule("akka-persistence")
.settings(crossScalaVersions -= akka.Dependencies.scala3Version) .settings(crossScalaVersions -= akka.Dependencies.scala3Version)
lazy val persistenceQuery = akkaModule("akka-persistence-query") lazy val persistenceQuery = akkaModule("akka-persistence-query")
.dependsOn(stream, persistence % "compile->compile;test->test", streamTestkit % "test") .dependsOn(
stream,
persistence % "compile->compile;test->test",
remote % "provided",
protobufV3 % "provided",
streamTestkit % "test")
.settings(Dependencies.persistenceQuery) .settings(Dependencies.persistenceQuery)
.settings(AutomaticModuleName.settings("akka.persistence.query")) .settings(AutomaticModuleName.settings("akka.persistence.query"))
.settings(OSGi.persistenceQuery) .settings(OSGi.persistenceQuery)
.settings(Protobuf.settings)
// To be able to import ContainerFormats.proto
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf"))
.settings(Test / fork := true) .settings(Test / fork := true)
.enablePlugins(ScaladocNoVerificationOfDiagrams) .enablePlugins(ScaladocNoVerificationOfDiagrams)

View file

@ -36,6 +36,8 @@ object AkkaDisciplinePlugin extends AutoPlugin {
"akka-cluster-sharding-typed", "akka-cluster-sharding-typed",
// references to deprecated PARSER fields in generated message formats? // references to deprecated PARSER fields in generated message formats?
"akka-persistence-typed", "akka-persistence-typed",
// references to deprecated PARSER fields in generated message formats?
"akka-persistence-query",
"akka-docs") "akka-docs")
val looseProjects = Set( val looseProjects = Set(