Fix DaemonMsgCreateSerializer #22225
This commit is contained in:
parent
ad4164d5e3
commit
734c31fa4f
6 changed files with 812 additions and 158 deletions
|
|
@ -4,13 +4,14 @@
|
|||
|
||||
package akka.remote.serialization
|
||||
|
||||
import akka.serialization.{ BaseSerializer, SerializationExtension }
|
||||
import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest }
|
||||
import akka.protobuf.ByteString
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import akka.actor.{ Deploy, ExtendedActorSystem, NoScopeGiven, Props, Scope }
|
||||
import akka.remote.DaemonMsgCreate
|
||||
import akka.remote.WireFormats.{ DaemonMsgCreateData, DeployData, PropsData }
|
||||
import akka.remote.WireFormats.{ DaemonMsgCreateData, DeployData, PropsData, SerializedMessage }
|
||||
import akka.routing.{ NoRouter, RouterConfig }
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
import util.{ Failure, Success }
|
||||
import java.io.Serializable
|
||||
|
|
@ -24,24 +25,16 @@ import java.io.Serializable
|
|||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
|
||||
private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
|
||||
import ProtobufSerializer.serializeActorRef
|
||||
import ProtobufSerializer.deserializeActorRef
|
||||
import Deploy.NoDispatcherGiven
|
||||
|
||||
@deprecated("Use constructor with ExtendedActorSystem", "2.4")
|
||||
def this() = this(null)
|
||||
|
||||
private val scala212OrLater = !scala.util.Properties.versionNumberString.startsWith("2.11")
|
||||
|
||||
// TODO remove this when deprecated this() is removed
|
||||
override val identifier: Int =
|
||||
if (system eq null) 3
|
||||
else identifierFromConfig
|
||||
private lazy val serialization = SerializationExtension(system)
|
||||
|
||||
def includeManifest: Boolean = false
|
||||
|
||||
lazy val serialization = SerializationExtension(system)
|
||||
override val includeManifest: Boolean = false
|
||||
|
||||
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case DaemonMsgCreate(props, deploy, path, supervisor) ⇒
|
||||
|
|
@ -49,11 +42,11 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
|
|||
def deployProto(d: Deploy): DeployData = {
|
||||
val builder = DeployData.newBuilder.setPath(d.path)
|
||||
if (d.config != ConfigFactory.empty)
|
||||
builder.setConfig(serialize(d.config))
|
||||
builder.setConfig(oldSerialize(d.config))
|
||||
if (d.routerConfig != NoRouter)
|
||||
builder.setRouterConfig(serialize(d.routerConfig))
|
||||
builder.setRouterConfig(oldSerialize(d.routerConfig))
|
||||
if (d.scope != NoScopeGiven)
|
||||
builder.setScope(serialize(d.scope))
|
||||
builder.setScope(oldSerialize(d.scope))
|
||||
if (d.dispatcher != NoDispatcherGiven)
|
||||
builder.setDispatcher(d.dispatcher)
|
||||
builder.build
|
||||
|
|
@ -63,22 +56,12 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
|
|||
val builder = PropsData.newBuilder
|
||||
.setClazz(props.clazz.getName)
|
||||
.setDeploy(deployProto(props.deploy))
|
||||
props.args map serialize foreach builder.addArgs
|
||||
props.args.map { a ⇒
|
||||
val argClassName =
|
||||
if (a == null) "null"
|
||||
else {
|
||||
val className = a.getClass.getName
|
||||
if (scala212OrLater && a.getClass.isInstanceOf[Serializable] && a.getClass.isSynthetic &&
|
||||
className.contains("$Lambda$")) {
|
||||
// The serialization of the parameters is based on passing class name instead of
|
||||
// serializerId and manifest as we usually do. With Scala 2.12 the functions are generated as
|
||||
// lambdas and we can't use that load class from that name when deserializing.
|
||||
classOf[Serializable].getName
|
||||
} else
|
||||
className
|
||||
}
|
||||
builder.addClasses(argClassName)
|
||||
props.args.foreach { arg ⇒
|
||||
val (serializerId, hasManifest, manifest, bytes) = serialize(arg)
|
||||
builder.addArgs(ByteString.copyFrom(bytes))
|
||||
builder.addManifests(manifest)
|
||||
builder.addSerializerIds(serializerId)
|
||||
builder.addHasManifest(hasManifest)
|
||||
}
|
||||
builder.build
|
||||
}
|
||||
|
|
@ -100,13 +83,13 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
|
|||
|
||||
def deploy(protoDeploy: DeployData): Deploy = {
|
||||
val config =
|
||||
if (protoDeploy.hasConfig) deserialize(protoDeploy.getConfig, classOf[Config])
|
||||
if (protoDeploy.hasConfig) oldDeserialize(protoDeploy.getConfig, classOf[Config])
|
||||
else ConfigFactory.empty
|
||||
val routerConfig =
|
||||
if (protoDeploy.hasRouterConfig) deserialize(protoDeploy.getRouterConfig, classOf[RouterConfig])
|
||||
if (protoDeploy.hasRouterConfig) oldDeserialize(protoDeploy.getRouterConfig, classOf[RouterConfig])
|
||||
else NoRouter
|
||||
val scope =
|
||||
if (protoDeploy.hasScope) deserialize(protoDeploy.getScope, classOf[Scope])
|
||||
if (protoDeploy.hasScope) oldDeserialize(protoDeploy.getScope, classOf[Scope])
|
||||
else NoScopeGiven
|
||||
val dispatcher =
|
||||
if (protoDeploy.hasDispatcher) protoDeploy.getDispatcher
|
||||
|
|
@ -116,10 +99,29 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
|
|||
|
||||
def props = {
|
||||
import scala.collection.JavaConverters._
|
||||
val clazz = system.dynamicAccess.getClassFor[AnyRef](proto.getProps.getClazz).get
|
||||
val args: Vector[AnyRef] = (proto.getProps.getArgsList.asScala zip proto.getProps.getClassesList.asScala)
|
||||
.map(deserialize)(collection.breakOut)
|
||||
Props(deploy(proto.getProps.getDeploy), clazz, args)
|
||||
val protoProps = proto.getProps
|
||||
val actorClass = system.dynamicAccess.getClassFor[AnyRef](protoProps.getClazz).get
|
||||
val args: Vector[AnyRef] =
|
||||
// message from a newer node always contains serializer ids and possibly a string manifest for each position
|
||||
if (protoProps.getSerializerIdsCount > 0) {
|
||||
for {
|
||||
idx ← (0 until protoProps.getSerializerIdsCount).toVector
|
||||
} yield {
|
||||
val manifest =
|
||||
if (protoProps.getHasManifest(idx)) protoProps.getManifests(idx)
|
||||
else ""
|
||||
serialization.deserializeByteBuffer(
|
||||
protoProps.getArgs(idx).asReadOnlyByteBuffer(),
|
||||
protoProps.getSerializerIds(idx),
|
||||
manifest)
|
||||
}
|
||||
} else {
|
||||
// message from an older node, which only provides data and class name
|
||||
// and never any serializer ids
|
||||
(proto.getProps.getArgsList.asScala zip proto.getProps.getManifestsList.asScala)
|
||||
.map(oldDeserialize)(collection.breakOut)
|
||||
}
|
||||
Props(deploy(proto.getProps.getDeploy), actorClass, args)
|
||||
}
|
||||
|
||||
DaemonMsgCreate(
|
||||
|
|
@ -129,13 +131,51 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
|
|||
supervisor = deserializeActorRef(system, proto.getSupervisor))
|
||||
}
|
||||
|
||||
protected def serialize(any: Any): ByteString = ByteString.copyFrom(serialization.serialize(any.asInstanceOf[AnyRef]).get)
|
||||
private def oldSerialize(any: Any): ByteString = ByteString.copyFrom(serialization.serialize(any.asInstanceOf[AnyRef]).get)
|
||||
|
||||
protected def deserialize(p: (ByteString, String)): AnyRef =
|
||||
if (p._1.isEmpty && p._2 == "null") null
|
||||
else deserialize(p._1, system.dynamicAccess.getClassFor[AnyRef](p._2).get)
|
||||
private def serialize(any: Any): (Int, Boolean, String, Array[Byte]) = {
|
||||
val m = any.asInstanceOf[AnyRef]
|
||||
val serializer = serialization.findSerializerFor(m)
|
||||
|
||||
protected def deserialize[T: ClassTag](data: ByteString, clazz: Class[T]): T = {
|
||||
// this trixery is to retain backwards wire compatibility while at the same time
|
||||
// allowing for usage of serializers with string manifests
|
||||
var hasManifest = false
|
||||
val manifest = serializer match {
|
||||
case ser: SerializerWithStringManifest ⇒
|
||||
hasManifest = true
|
||||
ser.manifest(m)
|
||||
case ser ⇒
|
||||
hasManifest = ser.includeManifest
|
||||
|
||||
// we do include class name regardless to retain wire compatibility
|
||||
// with older nodes who expect manifest to be the class name
|
||||
if (m eq null) {
|
||||
"null"
|
||||
} else {
|
||||
val className = m.getClass.getName
|
||||
if (scala212OrLater && m.isInstanceOf[Serializable] && m.getClass.isSynthetic && className.contains("$Lambda$")) {
|
||||
// When the additional-protobuf serializers are not enabled
|
||||
// the serialization of the parameters is based on passing class name instead of
|
||||
// serializerId and manifest as we usually do. With Scala 2.12 the functions are generated as
|
||||
// lambdas and we can't use that load class from that name when deserializing
|
||||
classOf[Serializable].getName
|
||||
} else {
|
||||
className
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(serializer.identifier, hasManifest, manifest, serializer.toBinary(m))
|
||||
}
|
||||
|
||||
private def oldDeserialize(p: (ByteString, String)): AnyRef =
|
||||
oldDeserialize(p._1, p._2)
|
||||
|
||||
private def oldDeserialize(data: ByteString, className: String): AnyRef =
|
||||
if (data.isEmpty && className == "null") null
|
||||
else oldDeserialize(data, system.dynamicAccess.getClassFor[AnyRef](className).get)
|
||||
|
||||
private def oldDeserialize[T: ClassTag](data: ByteString, clazz: Class[T]): T = {
|
||||
val bytes = data.toByteArray
|
||||
serialization.deserialize(bytes, clazz) match {
|
||||
case Success(x: T) ⇒ x
|
||||
|
|
@ -152,4 +192,5 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue