diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingSerializer.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingSerializer.scala index f103c1eba6..55ef42cded 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingSerializer.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingSerializer.scala @@ -5,19 +5,23 @@ package akka.cluster.sharding.typed.internal import java.io.NotSerializableException - import akka.annotation.InternalApi import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.internal.protobuf.ShardingMessages +import akka.protobufv3.internal.CodedOutputStream import akka.remote.serialization.WrappedPayloadSupport import akka.serialization.BaseSerializer +import akka.serialization.ByteBufferSerializer import akka.serialization.SerializerWithStringManifest +import java.nio.ByteBuffer + /** * INTERNAL API */ @InternalApi private[akka] class ShardingSerializer(val system: akka.actor.ExtendedActorSystem) extends SerializerWithStringManifest + with ByteBufferSerializer with BaseSerializer { private val payloadSupport = new WrappedPayloadSupport(system) @@ -52,4 +56,28 @@ import akka.serialization.SerializerWithStringManifest s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } + // buffer based avoiding a copy for artery + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = o match { + case env: ShardingEnvelope[_] => + val builder = ShardingMessages.ShardingEnvelope.newBuilder() + builder.setEntityId(env.entityId) + builder.setMessage(payloadSupport.payloadBuilder(env.message)) + val codedOutputStream = CodedOutputStream.newInstance(buf) + builder.build().writeTo(codedOutputStream) + codedOutputStream.flush() + case _ => + throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") + } + + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = manifest match { + case ShardingEnvelopeManifest => + val env = ShardingMessages.ShardingEnvelope.parseFrom(buf) + val entityId = env.getEntityId + val wrappedMsg = payloadSupport.deserializePayload(env.getMessage) + ShardingEnvelope(entityId, wrappedMsg) + case _ => + throw new NotSerializableException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") + } + } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala index 3eb10204b3..b64603c08e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala @@ -5,13 +5,15 @@ package akka.cluster.sharding.typed import org.scalatest.wordspec.AnyWordSpecLike - import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.cluster.sharding.typed.internal.ShardingSerializer import akka.serialization.SerializationExtension +import java.nio.ByteBuffer +import java.nio.ByteOrder + class ShardingSerializerSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { "The typed ShardingSerializer" must { @@ -24,6 +26,15 @@ class ShardingSerializerSpec extends ScalaTestWithActorTestKit with AnyWordSpecL val blob = serializer.toBinary(obj) val ref = serializer.fromBinary(blob, serializer.manifest(obj)) ref should ===(obj) + + val buffer = ByteBuffer.allocate(128) + buffer.order(ByteOrder.LITTLE_ENDIAN) + serializer.toBinary(obj, buffer) + buffer.flip() + val refFromBuf = serializer.fromBinary(buffer, serializer.manifest(obj)) + refFromBuf should ===(obj) + buffer.clear() + case s => throw new IllegalStateException(s"Wrong serializer ${s.getClass} for ${obj.getClass}") } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala b/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala index 3b76a5e426..23cc62413d 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala @@ -9,8 +9,12 @@ import akka.event.Logging import akka.protobufv3.internal.ByteString import akka.remote.ByteStringUtils import akka.remote.ContainerFormats +import akka.serialization.ByteBufferSerializer import akka.serialization.{ SerializationExtension, Serializers } import akka.serialization.DisabledJavaSerializer +import akka.serialization.SerializerWithStringManifest + +import java.nio.ByteOrder /** * INTERNAL API @@ -48,19 +52,31 @@ private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) { if (manifest.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) case _ => + // already zero copy of the serialized byte string so no point in going via bytebuf even if supported here builder .setEnclosedMessage(ByteStringUtils.toProtoByteStringUnsafe(serializer.toBinary(payload))) .setSerializerId(serializer.identifier) val manifest = Serializers.manifestFor(serializer, payload) if (manifest.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) } - builder } def deserializePayload(payload: ContainerFormats.Payload): Any = { val manifest = if (payload.hasMessageManifest) payload.getMessageManifest.toStringUtf8 else "" - serialization.deserialize(payload.getEnclosedMessage.toByteArray, payload.getSerializerId, manifest).get + serialization.serializerByIdentity(payload.getSerializerId) match { + case serializer: ByteBufferSerializer => + // may avoid one copy of the serialized payload if the proto byte is the right kind and the + // underlying payload serializer handles byte buffers + val buffer = payload.getEnclosedMessage.asReadOnlyByteBuffer() + buffer.order(ByteOrder.LITTLE_ENDIAN) + serializer.fromBinary(buffer, manifest) + case serializer: SerializerWithStringManifest => + serializer.fromBinary(payload.getEnclosedMessage.toByteArray, manifest) + case _ => + // only old class based manifest serializers? + serialization.deserialize(payload.getEnclosedMessage.toByteArray, payload.getSerializerId, manifest).get + } } }