pekko/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala

250 lines
9.1 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.serialization
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
import scala.collection.immutable
2017-02-21 16:08:16 +01:00
import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest }
import akka.protobuf.ByteString
2016-02-22 20:18:15 +01:00
import akka.actor.{ Deploy, ExtendedActorSystem, NoScopeGiven, Props, Scope }
import akka.remote.DaemonMsgCreate
import akka.remote.WireFormats.{ DaemonMsgCreateData, DeployData, PropsData }
import akka.routing.{ NoRouter, RouterConfig }
import com.typesafe.config.{ Config, ConfigFactory }
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
import akka.util.ccompat._
2017-02-21 16:08:16 +01:00
import scala.reflect.ClassTag
import util.{ Failure, Success }
/**
* Serializes Akka's internal DaemonMsgCreate using protobuf
* for the core structure of DaemonMsgCreate, Props and Deploy.
* Serialization of contained RouterConfig, Config, and Scope
* is done with configured serializer for those classes, by
* default java.io.Serializable.
2012-05-24 12:19:39 +02:00
*
* INTERNAL API
*/
2019-04-16 20:26:09 +02:00
@ccompatUsedUntil213
2017-02-21 16:08:16 +01:00
private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import ProtobufSerializer.serializeActorRef
import ProtobufSerializer.deserializeActorRef
import Deploy.NoDispatcherGiven
2017-02-21 16:08:16 +01:00
private lazy val serialization = SerializationExtension(system)
2017-02-21 16:08:16 +01:00
override val includeManifest: Boolean = false
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case DaemonMsgCreate(props, deploy, path, supervisor) =>
def deployProto(d: Deploy): DeployData = {
val builder = DeployData.newBuilder.setPath(d.path)
{
val (serId, _, manifest, bytes) = serialize(d.config)
builder.setConfigSerializerId(serId)
builder.setConfigManifest(manifest)
builder.setConfig(ByteString.copyFrom(bytes))
}
if (d.routerConfig != NoRouter) {
val (serId, _, manifest, bytes) = serialize(d.routerConfig)
builder.setRouterConfigSerializerId(serId)
builder.setRouterConfigManifest(manifest)
builder.setRouterConfig(ByteString.copyFrom(bytes))
}
if (d.scope != NoScopeGiven) {
val (serId, _, manifest, bytes) = serialize(d.scope)
builder.setScopeSerializerId(serId)
builder.setScopeManifest(manifest)
builder.setScope(ByteString.copyFrom(bytes))
}
if (d.dispatcher != NoDispatcherGiven) {
builder.setDispatcher(d.dispatcher)
}
builder.build
}
def propsProto = {
2019-03-11 10:38:24 +01:00
val builder = PropsData.newBuilder.setClazz(props.clazz.getName).setDeploy(deployProto(props.deploy))
props.args.foreach { arg =>
2017-02-21 16:08:16 +01:00
val (serializerId, hasManifest, manifest, bytes) = serialize(arg)
builder.addArgs(ByteString.copyFrom(bytes))
builder.addManifests(manifest)
builder.addSerializerIds(serializerId)
builder.addHasManifest(hasManifest)
}
builder.build
}
2019-03-11 10:38:24 +01:00
DaemonMsgCreateData.newBuilder
.setProps(propsProto)
.setDeploy(deployProto(deploy))
.setPath(path)
.setSupervisor(serializeActorRef(supervisor))
.build
.toByteArray
case _ =>
throw new IllegalArgumentException(
"Can't serialize a non-DaemonMsgCreate message using DaemonMsgCreateSerializer [%s]".format(obj))
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val proto = DaemonMsgCreateData.parseFrom(bytes)
def deploy(protoDeploy: DeployData): Deploy = {
val config =
if (protoDeploy.hasConfig) {
if (protoDeploy.hasConfigSerializerId) {
2019-03-11 10:38:24 +01:00
serialization
2019-03-13 10:56:20 +01:00
.deserialize(
protoDeploy.getConfig.toByteArray,
protoDeploy.getConfigSerializerId,
protoDeploy.getConfigManifest)
2019-03-11 10:38:24 +01:00
.get
.asInstanceOf[Config]
} else {
// old wire format
oldDeserialize(protoDeploy.getConfig, classOf[Config])
}
} else ConfigFactory.empty
val routerConfig =
if (protoDeploy.hasRouterConfig) {
if (protoDeploy.hasRouterConfigSerializerId) {
2019-03-11 10:38:24 +01:00
serialization
2019-03-13 10:56:20 +01:00
.deserialize(
protoDeploy.getRouterConfig.toByteArray,
protoDeploy.getRouterConfigSerializerId,
protoDeploy.getRouterConfigManifest)
2019-03-11 10:38:24 +01:00
.get
.asInstanceOf[RouterConfig]
} else {
// old wire format
oldDeserialize(protoDeploy.getRouterConfig, classOf[RouterConfig])
}
} else NoRouter
val scope =
if (protoDeploy.hasScope) {
if (protoDeploy.hasScopeSerializerId) {
2019-03-11 10:38:24 +01:00
serialization
2019-03-13 10:56:20 +01:00
.deserialize(
protoDeploy.getScope.toByteArray,
protoDeploy.getScopeSerializerId,
protoDeploy.getScopeManifest)
2019-03-11 10:38:24 +01:00
.get
.asInstanceOf[Scope]
} else {
// old wire format
oldDeserialize(protoDeploy.getScope, classOf[Scope])
}
} else NoScopeGiven
val dispatcher =
if (protoDeploy.hasDispatcher) protoDeploy.getDispatcher
else NoDispatcherGiven
Deploy(protoDeploy.getPath, config, routerConfig, scope, dispatcher)
}
def props = {
import scala.collection.JavaConverters._
2017-02-21 16:08:16 +01:00
val protoProps = proto.getProps
val actorClass = system.dynamicAccess.getClassFor[AnyRef](protoProps.getClazz).get
val args: Vector[AnyRef] =
// message from a newer node always contains serializer ids and possibly a string manifest for each position
if (protoProps.getSerializerIdsCount > 0) {
for {
idx <- (0 until protoProps.getSerializerIdsCount).toVector
2017-02-21 16:08:16 +01:00
} yield {
val manifest =
if (protoProps.getHasManifest(idx)) protoProps.getManifests(idx)
else ""
2019-03-11 10:38:24 +01:00
serialization
.deserialize(protoProps.getArgs(idx).toByteArray(), protoProps.getSerializerIds(idx), manifest)
.get
2017-02-21 16:08:16 +01:00
}
} else {
// message from an older node, which only provides data and class name
// and never any serializer ids
2019-03-11 10:38:24 +01:00
proto.getProps.getArgsList.asScala
.zip(proto.getProps.getManifestsList.asScala)
.iterator
.map(oldDeserialize)
.to(immutable.Vector)
2017-02-21 16:08:16 +01:00
}
Props(deploy(proto.getProps.getDeploy), actorClass, args)
}
2019-03-13 10:56:20 +01:00
DaemonMsgCreate(
props = props,
deploy = deploy(proto.getDeploy),
path = proto.getPath,
supervisor = deserializeActorRef(system, proto.getSupervisor))
}
2017-02-21 16:08:16 +01:00
private def serialize(any: Any): (Int, Boolean, String, Array[Byte]) = {
val m = any.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(m)
// this trixery is to retain backwards wire compatibility while at the same time
// allowing for usage of serializers with string manifests
val hasManifest = serializer.includeManifest
2017-02-21 16:08:16 +01:00
val manifest = serializer match {
case ser: SerializerWithStringManifest =>
2017-02-21 16:08:16 +01:00
ser.manifest(m)
case _ =>
2017-02-21 16:08:16 +01:00
// we do include class name regardless to retain wire compatibility
// with older nodes who expect manifest to be the class name
if (m eq null) {
"null"
} else {
val className = m.getClass.getName
if (m.isInstanceOf[java.io.Serializable] && m.getClass.isSynthetic && className.contains("$Lambda$")) {
2017-02-21 16:08:16 +01:00
// When the additional-protobuf serializers are not enabled
// the serialization of the parameters is based on passing class name instead of
// serializerId and manifest as we usually do. With Scala 2.12 the functions are generated as
// lambdas and we can't use that load class from that name when deserializing
classOf[java.io.Serializable].getName
2017-02-21 16:08:16 +01:00
} else {
className
}
}
}
(serializer.identifier, hasManifest, manifest, serializer.toBinary(m))
}
private def oldDeserialize(p: (ByteString, String)): AnyRef =
oldDeserialize(p._1, p._2)
2017-02-21 16:08:16 +01:00
private def oldDeserialize(data: ByteString, className: String): AnyRef =
if (data.isEmpty && className == "null") null
else oldDeserialize(data, system.dynamicAccess.getClassFor[AnyRef](className).get)
2017-02-21 16:08:16 +01:00
private def oldDeserialize[T: ClassTag](data: ByteString, clazz: Class[T]): T = {
val bytes = data.toByteArray
serialization.deserialize(bytes, clazz) match {
2019-03-11 10:38:24 +01:00
case Success(x: T) => x
case Success(other) =>
throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other))
case Failure(e) =>
// Fallback to the java serializer, because some interfaces don't implement java.io.Serializable,
// but the impl instance does. This could be optimized by adding java serializers in reference.conf:
// com.typesafe.config.Config
// akka.routing.RouterConfig
// akka.actor.Scope
serialization.deserialize(bytes, classOf[java.io.Serializable]) match {
case Success(x: T) => x
case _ => throw e // the first exception
}
}
}
2017-02-21 16:08:16 +01:00
2013-01-09 01:47:48 +01:00
}