diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/internal/QuerySerializer.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/internal/QuerySerializer.scala index e4c250de5b..e2030a19ad 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/internal/QuerySerializer.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/internal/QuerySerializer.scala @@ -9,10 +9,9 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.time.Instant import java.util.Base64 import java.util.UUID - import scala.util.control.NonFatal - import akka.annotation.InternalApi +import akka.event.Logging import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.Sequence @@ -20,7 +19,7 @@ import akka.persistence.query.TimeBasedUUID import akka.persistence.query.TimestampOffset import akka.persistence.query.internal.protobuf.QueryMessages import akka.persistence.query.typed.EventEnvelope -import akka.remote.serialization.WrappedPayloadSupport +import akka.remote.serialization.WrappedPayloadSupport.{ deserializePayload, payloadBuilder } import akka.serialization.BaseSerializer import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest @@ -35,7 +34,8 @@ import akka.serialization.Serializers extends SerializerWithStringManifest with BaseSerializer { - private lazy val payloadSupport = new WrappedPayloadSupport(system) + private val log = Logging(system, classOf[QuerySerializer]) + private lazy val serialization = SerializationExtension(system) private final val EventEnvelopeManifest = "a" @@ -71,8 +71,8 @@ import akka.serialization.Serializers .setOffset(offset) .setOffsetManifest(offsetManifest) - env.eventOption.foreach(event => builder.setEvent(payloadSupport.payloadBuilder(event))) - env.eventMetadata.foreach(meta => builder.setMetadata(payloadSupport.payloadBuilder(meta))) + env.eventOption.foreach(event => builder.setEvent(payloadBuilder(event, serialization, log))) + env.eventMetadata.foreach(meta => builder.setMetadata(payloadBuilder(meta, serialization, log))) builder.build().toByteArray() @@ -90,10 +90,10 @@ import akka.serialization.Serializers val offset = fromStorageRepresentation(env.getOffset, env.getOffsetManifest) val eventOption = - if (env.hasEvent) Option(payloadSupport.deserializePayload(env.getEvent)) + if (env.hasEvent) Option(deserializePayload(env.getEvent, serialization)) else None val metaOption = - if (env.hasMetadata) Option(payloadSupport.deserializePayload(env.getMetadata)) + if (env.hasMetadata) Option(deserializePayload(env.getMetadata, serialization)) else None new EventEnvelope( diff --git a/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala b/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala index 23cc62413d..80da8873d4 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala @@ -5,13 +5,14 @@ package akka.remote.serialization import akka.actor.ExtendedActorSystem -import akka.event.Logging +import akka.event.{ Logging, LoggingAdapter } import akka.protobufv3.internal.ByteString import akka.remote.ByteStringUtils import akka.remote.ContainerFormats import akka.serialization.ByteBufferSerializer import akka.serialization.{ SerializationExtension, Serializers } import akka.serialization.DisabledJavaSerializer +import akka.serialization.Serialization import akka.serialization.SerializerWithStringManifest import java.nio.ByteOrder @@ -30,7 +31,25 @@ private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) { * If `input` is a `Throwable` and can't be serialized because Java serialization is disabled it * will fallback to `ThrowableNotSerializableException`. */ - def payloadBuilder(input: Any): ContainerFormats.Payload.Builder = { + def payloadBuilder(input: Any): ContainerFormats.Payload.Builder = + WrappedPayloadSupport.payloadBuilder(input, serialization, log) + + def deserializePayload(payload: ContainerFormats.Payload): Any = + WrappedPayloadSupport.deserializePayload(payload, serialization) +} + +private[akka] object WrappedPayloadSupport { + + /** + * Serialize the `input` along with its `manifest` and `serializerId`. + * + * If `input` is a `Throwable` and can't be serialized because Java serialization is disabled it + * will fallback to `ThrowableNotSerializableException`. + */ + def payloadBuilder( + input: Any, + serialization: Serialization, + log: LoggingAdapter): ContainerFormats.Payload.Builder = { val payload = input.asInstanceOf[AnyRef] val builder = ContainerFormats.Payload.newBuilder() val serializer = serialization.findSerializerFor(payload) @@ -62,7 +81,7 @@ private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) { builder } - def deserializePayload(payload: ContainerFormats.Payload): Any = { + def deserializePayload(payload: ContainerFormats.Payload, serialization: Serialization): Any = { val manifest = if (payload.hasMessageManifest) payload.getMessageManifest.toStringUtf8 else "" serialization.serializerByIdentity(payload.getSerializerId) match { case serializer: ByteBufferSerializer =>