Delay run-time dependency from persistence-query to remote (#31199)
Scala 3 refers to the types of lazy fields in the `clinit` since
https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html
6c9c733896 (r62879336)
This commit is contained in:
parent
f224336bf8
commit
31d632e82e
2 changed files with 30 additions and 11 deletions
|
|
@ -9,10 +9,9 @@ import java.nio.charset.StandardCharsets.UTF_8
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.Base64
|
import java.util.Base64
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
|
import akka.event.Logging
|
||||||
import akka.persistence.query.NoOffset
|
import akka.persistence.query.NoOffset
|
||||||
import akka.persistence.query.Offset
|
import akka.persistence.query.Offset
|
||||||
import akka.persistence.query.Sequence
|
import akka.persistence.query.Sequence
|
||||||
|
|
@ -20,7 +19,7 @@ import akka.persistence.query.TimeBasedUUID
|
||||||
import akka.persistence.query.TimestampOffset
|
import akka.persistence.query.TimestampOffset
|
||||||
import akka.persistence.query.internal.protobuf.QueryMessages
|
import akka.persistence.query.internal.protobuf.QueryMessages
|
||||||
import akka.persistence.query.typed.EventEnvelope
|
import akka.persistence.query.typed.EventEnvelope
|
||||||
import akka.remote.serialization.WrappedPayloadSupport
|
import akka.remote.serialization.WrappedPayloadSupport.{ deserializePayload, payloadBuilder }
|
||||||
import akka.serialization.BaseSerializer
|
import akka.serialization.BaseSerializer
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
import akka.serialization.SerializerWithStringManifest
|
import akka.serialization.SerializerWithStringManifest
|
||||||
|
|
@ -35,7 +34,8 @@ import akka.serialization.Serializers
|
||||||
extends SerializerWithStringManifest
|
extends SerializerWithStringManifest
|
||||||
with BaseSerializer {
|
with BaseSerializer {
|
||||||
|
|
||||||
private lazy val payloadSupport = new WrappedPayloadSupport(system)
|
private val log = Logging(system, classOf[QuerySerializer])
|
||||||
|
|
||||||
private lazy val serialization = SerializationExtension(system)
|
private lazy val serialization = SerializationExtension(system)
|
||||||
|
|
||||||
private final val EventEnvelopeManifest = "a"
|
private final val EventEnvelopeManifest = "a"
|
||||||
|
|
@ -71,8 +71,8 @@ import akka.serialization.Serializers
|
||||||
.setOffset(offset)
|
.setOffset(offset)
|
||||||
.setOffsetManifest(offsetManifest)
|
.setOffsetManifest(offsetManifest)
|
||||||
|
|
||||||
env.eventOption.foreach(event => builder.setEvent(payloadSupport.payloadBuilder(event)))
|
env.eventOption.foreach(event => builder.setEvent(payloadBuilder(event, serialization, log)))
|
||||||
env.eventMetadata.foreach(meta => builder.setMetadata(payloadSupport.payloadBuilder(meta)))
|
env.eventMetadata.foreach(meta => builder.setMetadata(payloadBuilder(meta, serialization, log)))
|
||||||
|
|
||||||
builder.build().toByteArray()
|
builder.build().toByteArray()
|
||||||
|
|
||||||
|
|
@ -90,10 +90,10 @@ import akka.serialization.Serializers
|
||||||
val offset = fromStorageRepresentation(env.getOffset, env.getOffsetManifest)
|
val offset = fromStorageRepresentation(env.getOffset, env.getOffsetManifest)
|
||||||
|
|
||||||
val eventOption =
|
val eventOption =
|
||||||
if (env.hasEvent) Option(payloadSupport.deserializePayload(env.getEvent))
|
if (env.hasEvent) Option(deserializePayload(env.getEvent, serialization))
|
||||||
else None
|
else None
|
||||||
val metaOption =
|
val metaOption =
|
||||||
if (env.hasMetadata) Option(payloadSupport.deserializePayload(env.getMetadata))
|
if (env.hasMetadata) Option(deserializePayload(env.getMetadata, serialization))
|
||||||
else None
|
else None
|
||||||
|
|
||||||
new EventEnvelope(
|
new EventEnvelope(
|
||||||
|
|
|
||||||
|
|
@ -5,13 +5,14 @@
|
||||||
package akka.remote.serialization
|
package akka.remote.serialization
|
||||||
|
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.event.Logging
|
import akka.event.{ Logging, LoggingAdapter }
|
||||||
import akka.protobufv3.internal.ByteString
|
import akka.protobufv3.internal.ByteString
|
||||||
import akka.remote.ByteStringUtils
|
import akka.remote.ByteStringUtils
|
||||||
import akka.remote.ContainerFormats
|
import akka.remote.ContainerFormats
|
||||||
import akka.serialization.ByteBufferSerializer
|
import akka.serialization.ByteBufferSerializer
|
||||||
import akka.serialization.{ SerializationExtension, Serializers }
|
import akka.serialization.{ SerializationExtension, Serializers }
|
||||||
import akka.serialization.DisabledJavaSerializer
|
import akka.serialization.DisabledJavaSerializer
|
||||||
|
import akka.serialization.Serialization
|
||||||
import akka.serialization.SerializerWithStringManifest
|
import akka.serialization.SerializerWithStringManifest
|
||||||
|
|
||||||
import java.nio.ByteOrder
|
import java.nio.ByteOrder
|
||||||
|
|
@ -30,7 +31,25 @@ private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) {
|
||||||
* If `input` is a `Throwable` and can't be serialized because Java serialization is disabled it
|
* If `input` is a `Throwable` and can't be serialized because Java serialization is disabled it
|
||||||
* will fallback to `ThrowableNotSerializableException`.
|
* will fallback to `ThrowableNotSerializableException`.
|
||||||
*/
|
*/
|
||||||
def payloadBuilder(input: Any): ContainerFormats.Payload.Builder = {
|
def payloadBuilder(input: Any): ContainerFormats.Payload.Builder =
|
||||||
|
WrappedPayloadSupport.payloadBuilder(input, serialization, log)
|
||||||
|
|
||||||
|
def deserializePayload(payload: ContainerFormats.Payload): Any =
|
||||||
|
WrappedPayloadSupport.deserializePayload(payload, serialization)
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] object WrappedPayloadSupport {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serialize the `input` along with its `manifest` and `serializerId`.
|
||||||
|
*
|
||||||
|
* If `input` is a `Throwable` and can't be serialized because Java serialization is disabled it
|
||||||
|
* will fallback to `ThrowableNotSerializableException`.
|
||||||
|
*/
|
||||||
|
def payloadBuilder(
|
||||||
|
input: Any,
|
||||||
|
serialization: Serialization,
|
||||||
|
log: LoggingAdapter): ContainerFormats.Payload.Builder = {
|
||||||
val payload = input.asInstanceOf[AnyRef]
|
val payload = input.asInstanceOf[AnyRef]
|
||||||
val builder = ContainerFormats.Payload.newBuilder()
|
val builder = ContainerFormats.Payload.newBuilder()
|
||||||
val serializer = serialization.findSerializerFor(payload)
|
val serializer = serialization.findSerializerFor(payload)
|
||||||
|
|
@ -62,7 +81,7 @@ private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) {
|
||||||
builder
|
builder
|
||||||
}
|
}
|
||||||
|
|
||||||
def deserializePayload(payload: ContainerFormats.Payload): Any = {
|
def deserializePayload(payload: ContainerFormats.Payload, serialization: Serialization): Any = {
|
||||||
val manifest = if (payload.hasMessageManifest) payload.getMessageManifest.toStringUtf8 else ""
|
val manifest = if (payload.hasMessageManifest) payload.getMessageManifest.toStringUtf8 else ""
|
||||||
serialization.serializerByIdentity(payload.getSerializerId) match {
|
serialization.serializerByIdentity(payload.getSerializerId) match {
|
||||||
case serializer: ByteBufferSerializer =>
|
case serializer: ByteBufferSerializer =>
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue