Fix serialization in TypedActor (#24851)

* fixed serialization in TypedActor
* generalized duplicates via Serialization.manifestFor
This commit is contained in:
Kirill Yankov 2018-04-12 19:58:13 +03:00 committed by Patrik Nordwall
parent e495dab941
commit 3ebb9fa9c1
18 changed files with 107 additions and 179 deletions

View file

@ -11,7 +11,7 @@ import akka.actor.TypedActor._
import akka.japi.{ Option JOption }
import akka.pattern.ask
import akka.routing.RoundRobinGroup
import akka.serialization.JavaSerializer
import akka.serialization.{ JavaSerializer, SerializerWithStringManifest }
import akka.testkit.{ AkkaSpec, DefaultTimeout, EventFilter, TimingTest, filterEvents }
import akka.util.Timeout
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
@ -32,6 +32,8 @@ object TypedActorSpec {
fixed-pool-size = 60
}
}
akka.actor.serializers.sample = "akka.actor.TypedActorSpec$SampleSerializerWithStringManifest$"
akka.actor.serialization-bindings."akka.actor.TypedActorSpec$WithStringSerializedClass" = sample
akka.actor.serialize-messages = off
"""
@ -106,7 +108,7 @@ object TypedActorSpec {
@throws(classOf[TimeoutException])
def read(): Int
def testMethodCallSerialization(foo: Foo, s: String, i: Int): Unit = throw new IllegalStateException("expected")
def testMethodCallSerialization(foo: Foo, s: String, i: Int, o: WithStringSerializedClass): Unit = throw new IllegalStateException("expected")
}
class Bar extends Foo with Serializable {
@ -200,8 +202,35 @@ object TypedActorSpec {
}
}
trait F { def f(pow: Boolean): Int }
class FI extends F { def f(pow: Boolean): Int = if (pow) throw new IllegalStateException("expected") else 1 }
trait F {
def f(pow: Boolean): Int
}
class FI extends F {
def f(pow: Boolean): Int = if (pow) throw new IllegalStateException("expected") else 1
}
object SampleSerializerWithStringManifest extends SerializerWithStringManifest {
val manifest = "M"
override def identifier: Int = 777
override def manifest(o: AnyRef): String = manifest
override def toBinary(o: AnyRef): Array[Byte] = o match {
case _: WithStringSerializedClass Array(255.toByte)
case _ throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case manifest if bytes.length == 1 && bytes(0) == 255.toByte WithStringSerializedClass()
case _ throw new IllegalArgumentException(s"Cannot deserialize object with manifest $manifest")
}
}
case class WithStringSerializedClass()
}
class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
@ -445,7 +474,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
import java.io._
val someFoo: Foo = new Bar
JavaSerializer.currentSystem.withValue(system.asInstanceOf[ExtendedActorSystem]) {
val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int], classOf[WithStringSerializedClass]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef], WithStringSerializedClass()))
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
@ -457,12 +486,14 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall]
mNew.method should ===(m.method)
mNew.parameters should have size 3
mNew.parameters should have size 4
mNew.parameters(0) should not be null
mNew.parameters(0).getClass should ===(classOf[Bar])
mNew.parameters(1) should ===(null)
mNew.parameters(2) should not be null
mNew.parameters(2).asInstanceOf[Int] should ===(1)
mNew.parameters(3) should not be null
mNew.parameters(3).asInstanceOf[WithStringSerializedClass] should ===(WithStringSerializedClass())
}
}

View file

@ -5,9 +5,8 @@
package akka.actor
import language.existentials
import scala.util.control.NonFatal
import scala.util.{ Try, Success, Failure }
import scala.util.{ Failure, Success, Try }
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
@ -16,12 +15,13 @@ import akka.japi.{ Creator, Option ⇒ JOption }
import akka.japi.Util.{ immutableSeq, immutableSingletonSeq }
import akka.util.Timeout
import akka.util.Reflect.instantiator
import akka.serialization.{ JavaSerializer, SerializationExtension }
import akka.serialization.{ JavaSerializer, SerializationExtension, Serializers }
import akka.dispatch._
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import java.util.concurrent.TimeoutException
import java.io.ObjectStreamException
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import java.lang.reflect.{ InvocationHandler, InvocationTargetException, Method, Proxy }
import akka.pattern.AskTimeoutException
/**
@ -152,11 +152,11 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case ps if ps.length == 0 SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array())
case ps
val serialization = SerializationExtension(akka.serialization.JavaSerializer.currentSystem.value)
val serializedParameters = new Array[(Int, Class[_], Array[Byte])](ps.length)
val serializedParameters = new Array[(Int, String, Array[Byte])](ps.length)
for (i 0 until ps.length) {
val p = ps(i)
val s = serialization.findSerializerFor(p)
val m = if (s.includeManifest) p.getClass else null
val m = Serializers.manifestFor(s, p)
serializedParameters(i) = (s.identifier, m, s toBinary parameters(i)) //Mutable for the sake of sanity
}
@ -169,7 +169,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
*
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
*/
private[akka] final case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Int, Class[_], Array[Byte])]) {
private[akka] final case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Int, String, Array[Byte])]) {
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
@ -186,8 +186,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
val deserializedParameters: Array[AnyRef] = new Array[AnyRef](a.length) //Mutable for the sake of sanity
for (i 0 until a.length) {
val (sId, manifest, bytes) = a(i)
deserializedParameters(i) =
serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest))
deserializedParameters(i) = serialization.deserialize(bytes, sId, manifest).get
}
deserializedParameters

View file

@ -8,9 +8,8 @@ import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.collection.immutable
import akka.actor._
import akka.serialization.SerializationExtension
import akka.util.{ Unsafe, Helpers }
import akka.serialization.SerializerWithStringManifest
import akka.serialization.{ SerializationExtension, Serializers }
import akka.util.{ Helpers, Unsafe }
import java.util.Optional
private[akka] object Children {
@ -252,13 +251,8 @@ private[akka] trait Children { this: ActorCell ⇒
val o = arg.asInstanceOf[AnyRef]
val serializer = ser.findSerializerFor(o)
val bytes = serializer.toBinary(o)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(o)
ser.deserialize(bytes, serializer.identifier, manifest).get != null
case _
ser.deserialize(bytes, arg.getClass).get != null
}
val ms = Serializers.manifestFor(serializer, o)
ser.deserialize(bytes, serializer.identifier, ms).get != null
})
} catch {
case NonFatal(e) throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e)

View file

@ -5,22 +5,19 @@
package akka.actor.dungeon
import scala.annotation.tailrec
import akka.AkkaException
import akka.dispatch.{ Envelope, Mailbox }
import akka.dispatch.sysmsg._
import akka.event.Logging.Error
import akka.util.Unsafe
import akka.actor._
import akka.serialization.SerializationExtension
import akka.serialization.{ DisabledJavaSerializer, SerializationExtension, Serializers }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.control.Exception.Catcher
import akka.dispatch.MailboxType
import akka.dispatch.ProducesMessageQueue
import akka.serialization.SerializerWithStringManifest
import akka.dispatch.UnboundedMailbox
import akka.serialization.DisabledJavaSerializer
@SerialVersionUID(1L)
final case class SerializationCheckFailedException private (msg: Object, cause: Throwable)
@ -173,13 +170,8 @@ private[akka] trait Dispatch { this: ActorCell ⇒
obj // skip check for known "local" messages
else {
val bytes = serializer.toBinary(obj)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(obj)
s.deserialize(bytes, serializer.identifier, manifest).get
case _
s.deserialize(bytes, obj.getClass).get
}
val ms = Serializers.manifestFor(serializer, obj)
s.deserialize(bytes, serializer.identifier, ms).get
}
}

View file

@ -76,6 +76,16 @@ trait Serializer {
final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz))
}
object Serializers {
// NOTE!!! If you change this method it is likely that DaemonMsgCreateSerializer.serialize needs the changes too.
def manifestFor(s: Serializer, message: AnyRef): String = s match {
case s2: SerializerWithStringManifest s2.manifest(message)
case _ if (s.includeManifest) message.getClass.getName else ""
}
}
/**
* A Serializer represents a bimap between an object and an array of bytes representing that object.
*

View file

@ -11,7 +11,7 @@ import java.{ lang ⇒ jl }
import akka.actor.{ Address, ExtendedActorSystem }
import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages cm }
import akka.cluster.metrics._
import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest }
import akka.serialization.{ BaseSerializer, SerializationExtension, Serializers, SerializerWithStringManifest }
import akka.util.ClassLoaderObjectInputStream
import akka.protobuf.{ ByteString, MessageLite }
@ -122,16 +122,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS
builder.setData(ByteString.copyFrom(serializer.toBinary(selector)))
.setSerializerId(serializer.identifier)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(selector)
builder.setManifest(manifest)
case _
builder.setManifest(
if (serializer.includeManifest) selector.getClass.getName
else ""
)
}
val manifest = Serializers.manifestFor(serializer, selector)
builder.setManifest(manifest)
builder.build()
}

View file

@ -4,10 +4,9 @@
package akka.cluster.pubsub.protobuf
import akka.serialization.BaseSerializer
import akka.serialization._
import scala.collection.breakOut
import akka.actor.{ ExtendedActorSystem, Address }
import scala.Some
import akka.actor.{ Address, ExtendedActorSystem }
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
import akka.protobuf.{ ByteString, MessageLite }
import java.util.zip.GZIPOutputStream
@ -17,11 +16,8 @@ import akka.cluster.pubsub.protobuf.msg.{ DistributedPubSubMessages ⇒ dm }
import scala.collection.JavaConverters._
import akka.cluster.pubsub.DistributedPubSubMediator._
import akka.cluster.pubsub.DistributedPubSubMediator.Internal._
import akka.serialization.Serialization
import akka.actor.ActorRef
import akka.serialization.SerializationExtension
import scala.collection.immutable.TreeMap
import akka.serialization.SerializerWithStringManifest
import java.io.NotSerializableException
/**
@ -228,15 +224,8 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor
setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m)))
.setSerializerId(msgSerializer.identifier)
msgSerializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(m)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (msgSerializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName))
}
val ms = Serializers.manifestFor(msgSerializer, m)
if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms))
builder.build()
}

View file

@ -10,14 +10,13 @@ import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
import akka.actor.{ Address, ExtendedActorSystem }
import akka.cluster._
import akka.cluster.protobuf.msg.{ ClusterMessages cm }
import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest }
import akka.serialization._
import akka.protobuf.{ ByteString, MessageLite }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.concurrent.duration.Deadline
import akka.annotation.InternalApi
import akka.cluster.InternalClusterAction._
import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
@ -174,15 +173,8 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se
val serializer = serialization.findSerializerFor(pool)
builder.setSerializerId(serializer.identifier)
.setData(ByteString.copyFrom(serializer.toBinary(pool)))
serializer match {
case ser: SerializerWithStringManifest
builder.setManifest(ser.manifest(pool))
case _
builder.setManifest(
if (serializer.includeManifest) pool.getClass.getName
else ""
)
}
val manifest = Serializers.manifestFor(serializer, pool)
builder.setManifest(manifest)
builder.build()
}

View file

@ -17,12 +17,9 @@ import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.cluster.UniqueAddress
import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages dm }
import akka.serialization.JSerializer
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.serialization._
import akka.protobuf.ByteString
import akka.protobuf.MessageLite
import akka.serialization.SerializerWithStringManifest
import akka.cluster.ddata.VersionVector
/**
@ -144,15 +141,8 @@ trait SerializationSupport {
setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m)))
.setSerializerId(msgSerializer.identifier)
msgSerializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(m)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (msgSerializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName))
}
val ms = Serializers.manifestFor(msgSerializer, m)
if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms))
builder.build()
}

View file

@ -167,15 +167,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val serializer = serialization.findSerializerFor(payload)
val builder = mf.PersistentPayload.newBuilder()
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(payload)
if (manifest != PersistentRepr.Undefined)
builder.setPayloadManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
}
val ms = Serializers.manifestFor(serializer, payload)
if (ms.nonEmpty) builder.setPayloadManifest(ByteString.copyFromUtf8(ms))
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
builder.setSerializerId(serializer.identifier)

View file

@ -54,15 +54,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val out = new ByteArrayOutputStream
writeInt(out, snapshotSerializer.identifier)
snapshotSerializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(snapshot)
if (manifest != "")
out.write(manifest.getBytes(UTF_8))
case _
if (snapshotSerializer.includeManifest)
out.write(snapshot.getClass.getName.getBytes(UTF_8))
}
val ms = Serializers.manifestFor(snapshotSerializer, snapshot)
if (ms.nonEmpty) out.write(ms.getBytes(UTF_8))
out.toByteArray
}

View file

@ -8,10 +8,8 @@ import akka.remote.WireFormats._
import akka.protobuf.ByteString
import akka.actor.ExtendedActorSystem
import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope }
import akka.serialization.Serialization
import akka.serialization.ByteBufferSerializer
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.serialization._
import scala.util.control.NonFatal
/**
@ -46,15 +44,10 @@ private[akka] object MessageSerializer {
try {
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
builder.setSerializerId(serializer.identifier)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(message)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
}
val ms = Serializers.manifestFor(serializer, message)
if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms))
builder.build
} catch {
case NonFatal(e)
@ -68,21 +61,12 @@ private[akka] object MessageSerializer {
val serializer = serialization.findSerializerFor(message)
headerBuilder setSerializer serializer.identifier
def manifest: String = serializer match {
case ser: SerializerWithStringManifest ser.manifest(message)
case _ if (serializer.includeManifest) message.getClass.getName else ""
}
headerBuilder setManifest Serializers.manifestFor(serializer, message)
envelope.writeHeader(headerBuilder, outboundEnvelope)
serializer match {
case ser: ByteBufferSerializer
headerBuilder setManifest manifest
envelope.writeHeader(headerBuilder, outboundEnvelope)
ser.toBinary(message, envelope.byteBuffer)
case _
headerBuilder setManifest manifest
envelope.writeHeader(headerBuilder, outboundEnvelope)
envelope.byteBuffer.put(serializer.toBinary(message))
case ser: ByteBufferSerializer ser.toBinary(message, envelope.byteBuffer)
case _ envelope.byteBuffer.put(serializer.toBinary(message))
}
}

View file

@ -14,7 +14,7 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.remote.artery.compress.CompressionProtocol._
import akka.remote.artery.compress._
import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRefProvider, UniqueAddress }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.serialization.{ Serialization, SerializationExtension, Serializers }
import akka.stream._
import akka.stream.stage._
import akka.util.{ OptionVal, Unsafe }
@ -23,7 +23,6 @@ import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.serialization.SerializerWithStringManifest
/**
* INTERNAL API
@ -674,11 +673,7 @@ private[remote] class DuplicateHandshakeReq(
if (_serializerId == -1) {
val serialization = SerializationExtension(system)
val ser = serialization.serializerFor(classOf[HandshakeReq])
_manifest = ser match {
case s: SerializerWithStringManifest
s.manifest(HandshakeReq(inboundContext.localAddress, inboundContext.localAddress.address))
case _ ""
}
_manifest = Serializers.manifestFor(ser, HandshakeReq(inboundContext.localAddress, inboundContext.localAddress.address))
_serializerId = ser.identifier
}
}

View file

@ -14,7 +14,6 @@ import akka.routing.{ NoRouter, RouterConfig }
import scala.reflect.ClassTag
import util.{ Failure, Success }
import java.io.Serializable
/**
* Serializes Akka's internal DaemonMsgCreate using protobuf
@ -186,26 +185,23 @@ private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSys
// this trixery is to retain backwards wire compatibility while at the same time
// allowing for usage of serializers with string manifests
var hasManifest = false
val hasManifest = serializer.includeManifest
val manifest = serializer match {
case ser: SerializerWithStringManifest
hasManifest = true
ser.manifest(m)
case ser
hasManifest = ser.includeManifest
case _
// we do include class name regardless to retain wire compatibility
// with older nodes who expect manifest to be the class name
if (m eq null) {
"null"
} else {
val className = m.getClass.getName
if (scala212OrLater && m.isInstanceOf[Serializable] && m.getClass.isSynthetic && className.contains("$Lambda$")) {
if (scala212OrLater && m.isInstanceOf[java.io.Serializable] && m.getClass.isSynthetic && className.contains("$Lambda$")) {
// When the additional-protobuf serializers are not enabled
// the serialization of the parameters is based on passing class name instead of
// serializerId and manifest as we usually do. With Scala 2.12 the functions are generated as
// lambdas and we can't use that load class from that name when deserializing
classOf[Serializable].getName
classOf[java.io.Serializable].getName
} else {
className
}

View file

@ -13,9 +13,7 @@ import akka.actor.SelectChildPattern
import akka.actor.SelectParent
import akka.actor.SelectionPathElement
import akka.remote.ContainerFormats
import akka.serialization.SerializationExtension
import akka.serialization.BaseSerializer
import akka.serialization.SerializerWithStringManifest
import akka.serialization.{ BaseSerializer, SerializationExtension, Serializers }
class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
@ -39,15 +37,8 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe
setSerializerId(serializer.identifier).
setWildcardFanOut(sel.wildcardFanOut)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(message)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
}
val ms = Serializers.manifestFor(serializer, message)
if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms))
sel.elements.foreach {
case SelectChildName(name)

View file

@ -311,7 +311,6 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private val fromBinaryMap = Map[String, Array[Byte] AnyRef](
IdentifyManifest deserializeIdentify,
ActorIdentityManifest deserializeActorIdentity,
OptionManifest deserializeOption,
StatusSuccessManifest deserializeStatusSuccess,
StatusFailureManifest deserializeStatusFailure,
ThrowableManifest throwableSupport.deserializeThrowable,

View file

@ -6,8 +6,7 @@ package akka.remote.serialization
import akka.actor.ExtendedActorSystem
import akka.remote.ContainerFormats
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.serialization.{ SerializationExtension, Serializers }
import akka.protobuf.ByteString
/**
@ -26,15 +25,8 @@ private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) {
.setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(payload)))
.setSerializerId(serializer.identifier)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(payload)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(payload.getClass.getName))
}
val ms = Serializers.manifestFor(serializer, payload)
if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms))
builder
}

View file

@ -7,7 +7,7 @@ package akka.stream.serialization
import akka.protobuf.ByteString
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
import akka.serialization._
import akka.stream.StreamRefMessages
import akka.stream.impl.streamref._
@ -105,15 +105,8 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) e
.setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(p)))
.setSerializerId(msgSerializer.identifier)
msgSerializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(p)
if (manifest != "")
payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (msgSerializer.includeManifest)
payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(p.getClass.getName))
}
val ms = Serializers.manifestFor(msgSerializer, p)
if (ms.nonEmpty) payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(ms))
StreamRefMessages.SequencedOnNext.newBuilder()
.setSeqNr(o.seqNr)
@ -173,10 +166,12 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) e
val d = StreamRefMessages.CumulativeDemand.parseFrom(bytes)
StreamRefsProtocol.CumulativeDemand(d.getSeqNr)
}
private def deserializeRemoteStreamCompleted(bytes: Array[Byte]): StreamRefsProtocol.RemoteStreamCompleted = {
val d = StreamRefMessages.RemoteStreamCompleted.parseFrom(bytes)
StreamRefsProtocol.RemoteStreamCompleted(d.getSeqNr)
}
private def deserializeRemoteStreamFailure(bytes: Array[Byte]): AnyRef = {
val d = StreamRefMessages.RemoteStreamFailure.parseFrom(bytes)
StreamRefsProtocol.RemoteStreamFailure(d.getCause.toStringUtf8)