From 3ebb9fa9c1f7ec14be963661a9efbcc5ae223248 Mon Sep 17 00:00:00 2001 From: Kirill Yankov Date: Thu, 12 Apr 2018 19:58:13 +0300 Subject: [PATCH] Fix serialization in TypedActor (#24851) * fixed serialization in TypedActor * generalized duplicates via Serialization.manifestFor --- .../scala/akka/actor/TypedActorSpec.scala | 43 ++++++++++++++++--- .../main/scala/akka/actor/TypedActor.scala | 17 ++++---- .../scala/akka/actor/dungeon/Children.scala | 14 ++---- .../scala/akka/actor/dungeon/Dispatch.scala | 14 ++---- .../scala/akka/serialization/Serializer.scala | 10 +++++ .../metrics/protobuf/MessageSerializer.scala | 15 ++----- .../DistributedPubSubMessageSerializer.scala | 19 ++------ .../protobuf/ClusterMessageSerializer.scala | 14 ++---- .../ddata/protobuf/SerializationSupport.scala | 16 ++----- .../serialization/MessageSerializer.scala | 11 +---- .../serialization/SnapshotSerializer.scala | 11 +---- .../scala/akka/remote/MessageSerializer.scala | 36 +++++----------- .../scala/akka/remote/artery/Codecs.scala | 9 +--- .../DaemonMsgCreateSerializer.scala | 12 ++---- .../MessageContainerSerializer.scala | 15 ++----- .../serialization/MiscMessageSerializer.scala | 1 - .../serialization/WrappedPayloadSupport.scala | 14 ++---- .../serialization/StreamRefSerializer.scala | 15 +++---- 18 files changed, 107 insertions(+), 179 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index eca94dc452..031496b029 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -11,7 +11,7 @@ import akka.actor.TypedActor._ import akka.japi.{ Option ⇒ JOption } import akka.pattern.ask import akka.routing.RoundRobinGroup -import akka.serialization.JavaSerializer +import akka.serialization.{ JavaSerializer, SerializerWithStringManifest } import akka.testkit.{ AkkaSpec, DefaultTimeout, EventFilter, TimingTest, filterEvents } import akka.util.Timeout import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } @@ -32,6 +32,8 @@ object TypedActorSpec { fixed-pool-size = 60 } } + akka.actor.serializers.sample = "akka.actor.TypedActorSpec$SampleSerializerWithStringManifest$" + akka.actor.serialization-bindings."akka.actor.TypedActorSpec$WithStringSerializedClass" = sample akka.actor.serialize-messages = off """ @@ -106,7 +108,7 @@ object TypedActorSpec { @throws(classOf[TimeoutException]) def read(): Int - def testMethodCallSerialization(foo: Foo, s: String, i: Int): Unit = throw new IllegalStateException("expected") + def testMethodCallSerialization(foo: Foo, s: String, i: Int, o: WithStringSerializedClass): Unit = throw new IllegalStateException("expected") } class Bar extends Foo with Serializable { @@ -200,8 +202,35 @@ object TypedActorSpec { } } - trait F { def f(pow: Boolean): Int } - class FI extends F { def f(pow: Boolean): Int = if (pow) throw new IllegalStateException("expected") else 1 } + trait F { + def f(pow: Boolean): Int + } + + class FI extends F { + def f(pow: Boolean): Int = if (pow) throw new IllegalStateException("expected") else 1 + } + + object SampleSerializerWithStringManifest extends SerializerWithStringManifest { + + val manifest = "M" + + override def identifier: Int = 777 + + override def manifest(o: AnyRef): String = manifest + + override def toBinary(o: AnyRef): Array[Byte] = o match { + case _: WithStringSerializedClass ⇒ Array(255.toByte) + case _ ⇒ throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case manifest if bytes.length == 1 && bytes(0) == 255.toByte ⇒ WithStringSerializedClass() + case _ ⇒ throw new IllegalArgumentException(s"Cannot deserialize object with manifest $manifest") + } + } + + case class WithStringSerializedClass() + } class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) @@ -445,7 +474,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) import java.io._ val someFoo: Foo = new Bar JavaSerializer.currentSystem.withValue(system.asInstanceOf[ExtendedActorSystem]) { - val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef])) + val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int], classOf[WithStringSerializedClass]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef], WithStringSerializedClass())) val baos = new ByteArrayOutputStream(8192 * 4) val out = new ObjectOutputStream(baos) @@ -457,12 +486,14 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall] mNew.method should ===(m.method) - mNew.parameters should have size 3 + mNew.parameters should have size 4 mNew.parameters(0) should not be null mNew.parameters(0).getClass should ===(classOf[Bar]) mNew.parameters(1) should ===(null) mNew.parameters(2) should not be null mNew.parameters(2).asInstanceOf[Int] should ===(1) + mNew.parameters(3) should not be null + mNew.parameters(3).asInstanceOf[WithStringSerializedClass] should ===(WithStringSerializedClass()) } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 6e86877efb..24933c9a9a 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -5,9 +5,8 @@ package akka.actor import language.existentials - import scala.util.control.NonFatal -import scala.util.{ Try, Success, Failure } +import scala.util.{ Failure, Success, Try } import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag @@ -16,12 +15,13 @@ import akka.japi.{ Creator, Option ⇒ JOption } import akka.japi.Util.{ immutableSeq, immutableSingletonSeq } import akka.util.Timeout import akka.util.Reflect.instantiator -import akka.serialization.{ JavaSerializer, SerializationExtension } +import akka.serialization.{ JavaSerializer, SerializationExtension, Serializers } import akka.dispatch._ import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import java.util.concurrent.TimeoutException import java.io.ObjectStreamException -import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } +import java.lang.reflect.{ InvocationHandler, InvocationTargetException, Method, Proxy } + import akka.pattern.AskTimeoutException /** @@ -152,11 +152,11 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case ps if ps.length == 0 ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array()) case ps ⇒ val serialization = SerializationExtension(akka.serialization.JavaSerializer.currentSystem.value) - val serializedParameters = new Array[(Int, Class[_], Array[Byte])](ps.length) + val serializedParameters = new Array[(Int, String, Array[Byte])](ps.length) for (i ← 0 until ps.length) { val p = ps(i) val s = serialization.findSerializerFor(p) - val m = if (s.includeManifest) p.getClass else null + val m = Serializers.manifestFor(s, p) serializedParameters(i) = (s.identifier, m, s toBinary parameters(i)) //Mutable for the sake of sanity } @@ -169,7 +169,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi * * Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call */ - private[akka] final case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Int, Class[_], Array[Byte])]) { + private[akka] final case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Int, String, Array[Byte])]) { //TODO implement writeObject and readObject to serialize //TODO Possible optimization is to special encode the parameter-types to conserve space @@ -186,8 +186,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi val deserializedParameters: Array[AnyRef] = new Array[AnyRef](a.length) //Mutable for the sake of sanity for (i ← 0 until a.length) { val (sId, manifest, bytes) = a(i) - deserializedParameters(i) = - serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest)) + deserializedParameters(i) = serialization.deserialize(bytes, sId, manifest).get } deserializedParameters diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 80c7972bd7..1f22a70f67 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -8,9 +8,8 @@ import scala.annotation.tailrec import scala.util.control.NonFatal import scala.collection.immutable import akka.actor._ -import akka.serialization.SerializationExtension -import akka.util.{ Unsafe, Helpers } -import akka.serialization.SerializerWithStringManifest +import akka.serialization.{ SerializationExtension, Serializers } +import akka.util.{ Helpers, Unsafe } import java.util.Optional private[akka] object Children { @@ -252,13 +251,8 @@ private[akka] trait Children { this: ActorCell ⇒ val o = arg.asInstanceOf[AnyRef] val serializer = ser.findSerializerFor(o) val bytes = serializer.toBinary(o) - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(o) - ser.deserialize(bytes, serializer.identifier, manifest).get != null - case _ ⇒ - ser.deserialize(bytes, arg.getClass).get != null - } + val ms = Serializers.manifestFor(serializer, o) + ser.deserialize(bytes, serializer.identifier, ms).get != null }) } catch { case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 6b4b823721..3d087f624c 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -5,22 +5,19 @@ package akka.actor.dungeon import scala.annotation.tailrec - import akka.AkkaException import akka.dispatch.{ Envelope, Mailbox } import akka.dispatch.sysmsg._ import akka.event.Logging.Error import akka.util.Unsafe import akka.actor._ -import akka.serialization.SerializationExtension +import akka.serialization.{ DisabledJavaSerializer, SerializationExtension, Serializers } import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.control.Exception.Catcher import akka.dispatch.MailboxType import akka.dispatch.ProducesMessageQueue -import akka.serialization.SerializerWithStringManifest import akka.dispatch.UnboundedMailbox -import akka.serialization.DisabledJavaSerializer @SerialVersionUID(1L) final case class SerializationCheckFailedException private (msg: Object, cause: Throwable) @@ -173,13 +170,8 @@ private[akka] trait Dispatch { this: ActorCell ⇒ obj // skip check for known "local" messages else { val bytes = serializer.toBinary(obj) - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(obj) - s.deserialize(bytes, serializer.identifier, manifest).get - case _ ⇒ - s.deserialize(bytes, obj.getClass).get - } + val ms = Serializers.manifestFor(serializer, obj) + s.deserialize(bytes, serializer.identifier, ms).get } } diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 8312139de2..dbaba879e9 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -76,6 +76,16 @@ trait Serializer { final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz)) } +object Serializers { + + // NOTE!!! If you change this method it is likely that DaemonMsgCreateSerializer.serialize needs the changes too. + def manifestFor(s: Serializer, message: AnyRef): String = s match { + case s2: SerializerWithStringManifest ⇒ s2.manifest(message) + case _ ⇒ if (s.includeManifest) message.getClass.getName else "" + } + +} + /** * A Serializer represents a bimap between an object and an array of bytes representing that object. * diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala index 608e9bc71b..3dd9f79703 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala @@ -11,7 +11,7 @@ import java.{ lang ⇒ jl } import akka.actor.{ Address, ExtendedActorSystem } import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages ⇒ cm } import akka.cluster.metrics._ -import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest } +import akka.serialization.{ BaseSerializer, SerializationExtension, Serializers, SerializerWithStringManifest } import akka.util.ClassLoaderObjectInputStream import akka.protobuf.{ ByteString, MessageLite } @@ -122,16 +122,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS builder.setData(ByteString.copyFrom(serializer.toBinary(selector))) .setSerializerId(serializer.identifier) - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(selector) - builder.setManifest(manifest) - case _ ⇒ - builder.setManifest( - if (serializer.includeManifest) selector.getClass.getName - else "" - ) - } + val manifest = Serializers.manifestFor(serializer, selector) + builder.setManifest(manifest) + builder.build() } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala index d68600c1f5..c7eaee2088 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala @@ -4,10 +4,9 @@ package akka.cluster.pubsub.protobuf -import akka.serialization.BaseSerializer +import akka.serialization._ import scala.collection.breakOut -import akka.actor.{ ExtendedActorSystem, Address } -import scala.Some +import akka.actor.{ Address, ExtendedActorSystem } import java.io.{ ByteArrayInputStream, ByteArrayOutputStream } import akka.protobuf.{ ByteString, MessageLite } import java.util.zip.GZIPOutputStream @@ -17,11 +16,8 @@ import akka.cluster.pubsub.protobuf.msg.{ DistributedPubSubMessages ⇒ dm } import scala.collection.JavaConverters._ import akka.cluster.pubsub.DistributedPubSubMediator._ import akka.cluster.pubsub.DistributedPubSubMediator.Internal._ -import akka.serialization.Serialization import akka.actor.ActorRef -import akka.serialization.SerializationExtension import scala.collection.immutable.TreeMap -import akka.serialization.SerializerWithStringManifest import java.io.NotSerializableException /** @@ -228,15 +224,8 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m))) .setSerializerId(msgSerializer.identifier) - msgSerializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(m) - if (manifest != "") - builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (msgSerializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName)) - } + val ms = Serializers.manifestFor(msgSerializer, m) + if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms)) builder.build() } diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index aec54cd5b3..1419f6409b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -10,14 +10,13 @@ import java.util.zip.{ GZIPInputStream, GZIPOutputStream } import akka.actor.{ Address, ExtendedActorSystem } import akka.cluster._ import akka.cluster.protobuf.msg.{ ClusterMessages ⇒ cm } -import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest } +import akka.serialization._ import akka.protobuf.{ ByteString, MessageLite } import scala.annotation.tailrec import scala.collection.immutable import scala.collection.JavaConverters._ import scala.concurrent.duration.Deadline - import akka.annotation.InternalApi import akka.cluster.InternalClusterAction._ import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } @@ -174,15 +173,8 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se val serializer = serialization.findSerializerFor(pool) builder.setSerializerId(serializer.identifier) .setData(ByteString.copyFrom(serializer.toBinary(pool))) - serializer match { - case ser: SerializerWithStringManifest ⇒ - builder.setManifest(ser.manifest(pool)) - case _ ⇒ - builder.setManifest( - if (serializer.includeManifest) pool.getClass.getName - else "" - ) - } + val manifest = Serializers.manifestFor(serializer, pool) + builder.setManifest(manifest) builder.build() } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala index 4cb0214f4f..8cdd4ae502 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala @@ -17,12 +17,9 @@ import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.cluster.UniqueAddress import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages ⇒ dm } -import akka.serialization.JSerializer -import akka.serialization.Serialization -import akka.serialization.SerializationExtension +import akka.serialization._ import akka.protobuf.ByteString import akka.protobuf.MessageLite -import akka.serialization.SerializerWithStringManifest import akka.cluster.ddata.VersionVector /** @@ -144,15 +141,8 @@ trait SerializationSupport { setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m))) .setSerializerId(msgSerializer.identifier) - msgSerializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(m) - if (manifest != "") - builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (msgSerializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName)) - } + val ms = Serializers.manifestFor(msgSerializer, m) + if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms)) builder.build() } diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index beaaccd30b..4dd7c88832 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -167,15 +167,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer val serializer = serialization.findSerializerFor(payload) val builder = mf.PersistentPayload.newBuilder() - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(payload) - if (manifest != PersistentRepr.Undefined) - builder.setPayloadManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (serializer.includeManifest) - builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName)) - } + val ms = Serializers.manifestFor(serializer, payload) + if (ms.nonEmpty) builder.setPayloadManifest(ByteString.copyFromUtf8(ms)) builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) builder.setSerializerId(serializer.identifier) diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index 202a49cb19..df1c5413ed 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -54,15 +54,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer val out = new ByteArrayOutputStream writeInt(out, snapshotSerializer.identifier) - snapshotSerializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(snapshot) - if (manifest != "") - out.write(manifest.getBytes(UTF_8)) - case _ ⇒ - if (snapshotSerializer.includeManifest) - out.write(snapshot.getClass.getName.getBytes(UTF_8)) - } + val ms = Serializers.manifestFor(snapshotSerializer, snapshot) + if (ms.nonEmpty) out.write(ms.getBytes(UTF_8)) out.toByteArray } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index c5c9a7504a..1777d88d7d 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -8,10 +8,8 @@ import akka.remote.WireFormats._ import akka.protobuf.ByteString import akka.actor.ExtendedActorSystem import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope } -import akka.serialization.Serialization -import akka.serialization.ByteBufferSerializer -import akka.serialization.SerializationExtension -import akka.serialization.SerializerWithStringManifest +import akka.serialization._ + import scala.util.control.NonFatal /** @@ -46,15 +44,10 @@ private[akka] object MessageSerializer { try { builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) builder.setSerializerId(serializer.identifier) - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(message) - if (manifest != "") - builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (serializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) - } + + val ms = Serializers.manifestFor(serializer, message) + if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms)) + builder.build } catch { case NonFatal(e) ⇒ @@ -68,21 +61,12 @@ private[akka] object MessageSerializer { val serializer = serialization.findSerializerFor(message) headerBuilder setSerializer serializer.identifier - - def manifest: String = serializer match { - case ser: SerializerWithStringManifest ⇒ ser.manifest(message) - case _ ⇒ if (serializer.includeManifest) message.getClass.getName else "" - } + headerBuilder setManifest Serializers.manifestFor(serializer, message) + envelope.writeHeader(headerBuilder, outboundEnvelope) serializer match { - case ser: ByteBufferSerializer ⇒ - headerBuilder setManifest manifest - envelope.writeHeader(headerBuilder, outboundEnvelope) - ser.toBinary(message, envelope.byteBuffer) - case _ ⇒ - headerBuilder setManifest manifest - envelope.writeHeader(headerBuilder, outboundEnvelope) - envelope.byteBuffer.put(serializer.toBinary(message)) + case ser: ByteBufferSerializer ⇒ ser.toBinary(message, envelope.byteBuffer) + case _ ⇒ envelope.byteBuffer.put(serializer.toBinary(message)) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index ef27b3eee7..f3816754f8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -14,7 +14,7 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.remote.artery.compress.CompressionProtocol._ import akka.remote.artery.compress._ import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRefProvider, UniqueAddress } -import akka.serialization.{ Serialization, SerializationExtension } +import akka.serialization.{ Serialization, SerializationExtension, Serializers } import akka.stream._ import akka.stream.stage._ import akka.util.{ OptionVal, Unsafe } @@ -23,7 +23,6 @@ import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal import akka.remote.artery.OutboundHandshake.HandshakeReq -import akka.serialization.SerializerWithStringManifest /** * INTERNAL API @@ -674,11 +673,7 @@ private[remote] class DuplicateHandshakeReq( if (_serializerId == -1) { val serialization = SerializationExtension(system) val ser = serialization.serializerFor(classOf[HandshakeReq]) - _manifest = ser match { - case s: SerializerWithStringManifest ⇒ - s.manifest(HandshakeReq(inboundContext.localAddress, inboundContext.localAddress.address)) - case _ ⇒ "" - } + _manifest = Serializers.manifestFor(ser, HandshakeReq(inboundContext.localAddress, inboundContext.localAddress.address)) _serializerId = ser.identifier } } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala index 5f48f7ed47..b107bcd2a7 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala @@ -14,7 +14,6 @@ import akka.routing.{ NoRouter, RouterConfig } import scala.reflect.ClassTag import util.{ Failure, Success } -import java.io.Serializable /** * Serializes Akka's internal DaemonMsgCreate using protobuf @@ -186,26 +185,23 @@ private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSys // this trixery is to retain backwards wire compatibility while at the same time // allowing for usage of serializers with string manifests - var hasManifest = false + val hasManifest = serializer.includeManifest val manifest = serializer match { case ser: SerializerWithStringManifest ⇒ - hasManifest = true ser.manifest(m) - case ser ⇒ - hasManifest = ser.includeManifest - + case _ ⇒ // we do include class name regardless to retain wire compatibility // with older nodes who expect manifest to be the class name if (m eq null) { "null" } else { val className = m.getClass.getName - if (scala212OrLater && m.isInstanceOf[Serializable] && m.getClass.isSynthetic && className.contains("$Lambda$")) { + if (scala212OrLater && m.isInstanceOf[java.io.Serializable] && m.getClass.isSynthetic && className.contains("$Lambda$")) { // When the additional-protobuf serializers are not enabled // the serialization of the parameters is based on passing class name instead of // serializerId and manifest as we usually do. With Scala 2.12 the functions are generated as // lambdas and we can't use that load class from that name when deserializing - classOf[Serializable].getName + classOf[java.io.Serializable].getName } else { className } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala index 8c7a4655ac..3f0e3da5d3 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala @@ -13,9 +13,7 @@ import akka.actor.SelectChildPattern import akka.actor.SelectParent import akka.actor.SelectionPathElement import akka.remote.ContainerFormats -import akka.serialization.SerializationExtension -import akka.serialization.BaseSerializer -import akka.serialization.SerializerWithStringManifest +import akka.serialization.{ BaseSerializer, SerializationExtension, Serializers } class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSerializer { @@ -39,15 +37,8 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe setSerializerId(serializer.identifier). setWildcardFanOut(sel.wildcardFanOut) - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(message) - if (manifest != "") - builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (serializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) - } + val ms = Serializers.manifestFor(serializer, message) + if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms)) sel.elements.foreach { case SelectChildName(name) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala index ba208cb291..38c6ff35ee 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala @@ -311,7 +311,6 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW private val fromBinaryMap = Map[String, Array[Byte] ⇒ AnyRef]( IdentifyManifest → deserializeIdentify, ActorIdentityManifest → deserializeActorIdentity, - OptionManifest → deserializeOption, StatusSuccessManifest → deserializeStatusSuccess, StatusFailureManifest → deserializeStatusFailure, ThrowableManifest → throwableSupport.deserializeThrowable, diff --git a/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala b/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala index 098496c157..8abacf9cdb 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala @@ -6,8 +6,7 @@ package akka.remote.serialization import akka.actor.ExtendedActorSystem import akka.remote.ContainerFormats -import akka.serialization.SerializationExtension -import akka.serialization.SerializerWithStringManifest +import akka.serialization.{ SerializationExtension, Serializers } import akka.protobuf.ByteString /** @@ -26,15 +25,8 @@ private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) { .setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(payload))) .setSerializerId(serializer.identifier) - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(payload) - if (manifest != "") - builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (serializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(payload.getClass.getName)) - } + val ms = Serializers.manifestFor(serializer, payload) + if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms)) builder } diff --git a/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala b/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala index 1641fc9896..3ed2a6b9f9 100644 --- a/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala +++ b/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala @@ -7,7 +7,7 @@ package akka.stream.serialization import akka.protobuf.ByteString import akka.actor.ExtendedActorSystem import akka.annotation.InternalApi -import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest } +import akka.serialization._ import akka.stream.StreamRefMessages import akka.stream.impl.streamref._ @@ -105,15 +105,8 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) e .setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(p))) .setSerializerId(msgSerializer.identifier) - msgSerializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(p) - if (manifest != "") - payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (msgSerializer.includeManifest) - payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(p.getClass.getName)) - } + val ms = Serializers.manifestFor(msgSerializer, p) + if (ms.nonEmpty) payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(ms)) StreamRefMessages.SequencedOnNext.newBuilder() .setSeqNr(o.seqNr) @@ -173,10 +166,12 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) e val d = StreamRefMessages.CumulativeDemand.parseFrom(bytes) StreamRefsProtocol.CumulativeDemand(d.getSeqNr) } + private def deserializeRemoteStreamCompleted(bytes: Array[Byte]): StreamRefsProtocol.RemoteStreamCompleted = { val d = StreamRefMessages.RemoteStreamCompleted.parseFrom(bytes) StreamRefsProtocol.RemoteStreamCompleted(d.getSeqNr) } + private def deserializeRemoteStreamFailure(bytes: Array[Byte]): AnyRef = { val d = StreamRefMessages.RemoteStreamFailure.parseFrom(bytes) StreamRefsProtocol.RemoteStreamFailure(d.getCause.toStringUtf8)