From b72ca927992d7c2d510154b65b017af5a0a2dd99 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 20 Dec 2016 15:27:32 +0100 Subject: [PATCH] serialize Scala 2.12 functions for when used in remote deployment, #22041 --- .../akka/serialization/SerializeSpec.scala | 10 ++++++++++ .../DaemonMsgCreateSerializer.scala | 20 ++++++++++++++++++- .../DaemonMsgCreateSerializerSpec.scala | 18 +++++++++++++++-- project/Dependencies.scala | 4 ++-- 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 13ab64e359..8e4d2e56cd 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -331,6 +331,15 @@ class ReferenceSerializationSpec extends AkkaSpec(SerializationTests.mostlyRefer intercept[NotSerializableException] { ser.serializerFor(classOf[Object]) } } + "serialize function with JavaSerializer" in { + val f = (i: Int) ⇒ i + 1 + val serializer = ser.serializerFor(f.getClass) + serializer.getClass should ===(classOf[JavaSerializer]) + val bytes = ser.serialize(f).get + val f2 = ser.deserialize(bytes, serializer.identifier, "").get.asInstanceOf[Function1[Int, Int]] + f2(3) should ===(4) + } + } } @@ -424,6 +433,7 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR "626c653b4c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78700000" + "00007070") } + } } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala index 0878195572..11806c1fb3 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala @@ -13,6 +13,7 @@ import akka.remote.WireFormats.{ DaemonMsgCreateData, DeployData, PropsData } import akka.routing.{ NoRouter, RouterConfig } import scala.reflect.ClassTag import util.{ Failure, Success } +import java.io.Serializable /** * Serializes Akka's internal DaemonMsgCreate using protobuf @@ -31,6 +32,8 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e @deprecated("Use constructor with ExtendedActorSystem", "2.4") def this() = this(null) + private val scala212OrLater = !scala.util.Properties.versionNumberString.startsWith("2.11") + // TODO remove this when deprecated this() is removed override val identifier: Int = if (system eq null) 3 @@ -61,7 +64,22 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e .setClazz(props.clazz.getName) .setDeploy(deployProto(props.deploy)) props.args map serialize foreach builder.addArgs - props.args map (a ⇒ if (a == null) "null" else a.getClass.getName) foreach builder.addClasses + props.args.map { a ⇒ + val argClassName = + if (a == null) "null" + else { + val className = a.getClass.getName + if (scala212OrLater && a.getClass.isInstanceOf[Serializable] && a.getClass.isSynthetic && + className.contains("$Lambda$")) { + // The serialization of the parameters is based on passing class name instead of + // serializerId and manifest as we usually do. With Scala 2.12 the functions are generated as + // lambdas and we can't use that load class from that name when deserializing. + classOf[Serializable].getName + } else + className + } + builder.addClasses(argClassName) + } builder.build } diff --git a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala index 5cf688d05b..967fa90765 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala @@ -11,7 +11,6 @@ import akka.testkit.AkkaSpec import akka.actor.{ Actor, Address, Props, Deploy, OneForOneStrategy, SupervisorStrategy } import akka.remote.{ DaemonMsgCreate, RemoteScope } import akka.routing.{ RoundRobinPool, FromConfig } -import akka.util.IgnoreForScala212 import scala.concurrent.duration._ object DaemonMsgCreateSerializerSpec { @@ -22,6 +21,10 @@ object DaemonMsgCreateSerializerSpec { class MyActorWithParam(ignore: String) extends Actor { def receive = Actor.emptyBehavior } + + class MyActorWithFunParam(fun: Function1[Int, Int]) extends Actor { + def receive = Actor.emptyBehavior + } } class DaemonMsgCreateSerializerSpec extends AkkaSpec { @@ -56,7 +59,7 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec { } } - "serialize and de-serialize DaemonMsgCreate with function creator" taggedAs IgnoreForScala212 in { + "serialize and de-serialize DaemonMsgCreate with function creator" in { verifySerialization { DaemonMsgCreate( props = Props(new MyActor), @@ -66,6 +69,16 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec { } } + "serialize and de-serialize DaemonMsgCreate with FromClassCreator, with function parameters for Props" in { + verifySerialization { + DaemonMsgCreate( + props = Props(classOf[MyActorWithFunParam], (i: Int) ⇒ i + 1), + deploy = Deploy(), + path = "foo", + supervisor = supervisor) + } + } + "serialize and de-serialize DaemonMsgCreate with Deploy and RouterConfig" in { verifySerialization { // Duration.Inf doesn't equal Duration.Inf, so we use another for test @@ -103,6 +116,7 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec { got.props.args zip expected.props.args foreach { case (g, e) ⇒ if (e.isInstanceOf[Function0[_]]) () + else if (e.isInstanceOf[Function1[_, _]]) () else g should ===(e) } got.props.deploy should ===(expected.props.deploy) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 78e25afb1d..e45362bb54 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,8 +17,8 @@ object Dependencies { val sslConfigVersion = "0.2.1" val Versions = Seq( - crossScalaVersions := Seq("2.11.8"), // "2.12.0" - scalaVersion := crossScalaVersions.value.head, + crossScalaVersions := Seq("2.11.8", "2.12.1"), + scalaVersion := System.getProperty("akka.build.scalaVersion", crossScalaVersions.value.head), scalaStmVersion := sys.props.get("akka.build.scalaStmVersion").getOrElse("0.8"), scalaCheckVersion := sys.props.get("akka.build.scalaCheckVersion").getOrElse( if (scalaVersion.value.startsWith("2.12")) "1.13.4" // does not work for 2.11