enable misc serializers by default for Artery, #21339

* placed them in a new section additional-serialization-bindings,
  which is included by default when Artery is enabled
* can also be enabled with enable-additional-serialization-bindings
  flag to simplify usage with old remoting
* added a JavaSerializable marker trait that is bound to JavaSerializer
  in testkit, this can be used in tests so that we eventually can run
  tests without the java.io.Serializable binding
This commit is contained in:
Patrik Nordwall 2016-09-09 09:01:15 +02:00
parent 02de58392a
commit 97e0628173
12 changed files with 70 additions and 31 deletions

View file

@ -585,6 +585,19 @@ akka {
"[B" = bytes
"java.io.Serializable" = java
}
# Set this to on to enable serialization-bindings define in
# additional-serialization-bindings. Those are by default not included
# for backwards compatibility reasons. They are enabled by default if
# akka.remote.artery.enabled=on.
enable-additional-serialization-bindings = off
# Additional serialization-bindings that are replacing Java serialization are
# defined in this section and not included by default for backwards compatibility
# reasons. They can be enabled with enable-additional-serialization-bindings=on.
# They are enabled by default if akka.remote.artery.enabled=on.
additional-serialization-bindings {
}
# Log warnings when the default Java serialization is used to serialize messages.
# The default serializer uses Java serialization which is not very performant and should not

View file

@ -34,12 +34,20 @@ object Serialization {
private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
class Settings(val config: Config) {
val Serializers: Map[String, String] = configToMap("akka.actor.serializers")
val SerializationBindings: Map[String, String] = configToMap("akka.actor.serialization-bindings")
val Serializers: Map[String, String] = configToMap(config.getConfig("akka.actor.serializers"))
val SerializationBindings: Map[String, String] = {
val defaultBindings = config.getConfig("akka.actor.serialization-bindings")
val bindings =
if (config.getBoolean("akka.actor.enable-additional-serialization-bindings") ||
config.getBoolean("akka.remote.artery.enabled"))
defaultBindings.withFallback(config.getConfig("akka.actor.additional-serialization-bindings"))
else defaultBindings
configToMap(bindings)
}
private final def configToMap(path: String): Map[String, String] = {
private final def configToMap(cfg: Config): Map[String, String] = {
import scala.collection.JavaConverters._
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) (k v.toString) }
cfg.root.unwrapped.asScala.toMap map { case (k, v) (k v.toString) }
}
}

View file

@ -70,10 +70,10 @@ object MaxThroughputSpec extends MultiNodeConfig {
""")))
case object Run
sealed trait Echo extends DeadLetterSuppression
sealed trait Echo extends DeadLetterSuppression with JavaSerializable
final case object Start extends Echo
final case object End extends Echo
final case class EndResult(totalReceived: Long)
final case class EndResult(totalReceived: Long) extends JavaSerializable
final case class FlowControl(burstStartTime: Long) extends Echo
def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean): Props =

View file

@ -22,15 +22,7 @@ akka {
serialization-bindings {
"akka.actor.ActorSelectionMessage" = akka-containers
# The classes akka.actor.Identify and akka.actor.ActorIdentity serialization/deserialization are required by
# the cluster client to work.
# For the purpose of preserving protocol backward compatibility, akka.actor.Identify and akka.actor.ActorIdentity
# are stil using java serialization by default.
# Should java serialization be disabled, uncomment the following lines
# "akka.actor.Identify" = akka-misc
# "akka.actor.ActorIdentity" = akka-misc
# "scala.Some" = akka-misc
# "scala.None$" = akka-misc
"akka.remote.DaemonMsgCreate" = daemon-create
"akka.remote.artery.ArteryMessage" = artery
@ -47,6 +39,16 @@ akka {
# i.e. com.google.protobuf dependency has been added in the application project.
"com.google.protobuf.GeneratedMessage" = proto
}
# For the purpose of preserving protocol backward compatibility these bindings are not
# included by default. They can be enabled with enable-additional-serialization-bindings=on.
# They are enabled by default if akka.remote.artery.enabled=on.
additional-serialization-bindings {
"akka.actor.Identify" = akka-misc
"akka.actor.ActorIdentity" = akka-misc
"scala.Some" = akka-misc
"scala.None$" = akka-misc
}
serialization-identifiers {
"akka.remote.serialization.ProtobufSerializer" = 2

View file

@ -10,10 +10,12 @@ import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.testkit.JavaSerializable
object LargeMessagesStreamSpec {
case class Ping(payload: ByteString = ByteString.empty)
case class Pong(bytesReceived: Long)
case class Ping(payload: ByteString = ByteString.empty) extends JavaSerializable
case class Pong(bytesReceived: Long) extends JavaSerializable
class EchoSize extends Actor {
def receive = {
case Ping(bytes) sender() ! Pong(bytes.size)

View file

@ -13,7 +13,7 @@ import akka.util.Timeout
import scala.concurrent.duration._
object RemoteActorForSpec {
final case class ActorForReq(s: String)
final case class ActorForReq(s: String) extends JavaSerializable
}
class RemoteActorForSpec extends ArteryMultiNodeSpec("akka.loglevel=INFO") with ImplicitSender with DefaultTimeout {

View file

@ -9,10 +9,11 @@ import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestActors }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.testkit.JavaSerializable
object RemoteActorSelectionSpec {
final case class ActorSelReq(s: String)
final case class ActorCreateReq(props: Props, name: String)
final case class ActorSelReq(s: String) extends JavaSerializable
final case class ActorCreateReq(props: Props, name: String) extends JavaSerializable
class SelectionActor extends Actor with ActorLogging {
log.info("Started")

View file

@ -37,8 +37,8 @@ object RemoteWatcherSpec {
}
object TestRemoteWatcher {
final case class AddressTerm(address: Address)
final case class Quarantined(address: Address, uid: Option[Int])
final case class AddressTerm(address: Address) extends JavaSerializable
final case class Quarantined(address: Address, uid: Option[Int]) extends JavaSerializable
}
class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(

View file

@ -25,10 +25,11 @@ import akka.testkit.TestEvent
import akka.event.Logging
import akka.remote.RARP
import akka.testkit.EventFilter
import akka.testkit.JavaSerializable
object UntrustedSpec {
final case class IdentifyReq(path: String)
final case class StopChild(name: String)
final case class IdentifyReq(path: String) extends JavaSerializable
final case class StopChild(name: String) extends JavaSerializable
class Receptionist(testActor: ActorRef) extends Actor {
context.actorOf(Props(classOf[Child], testActor), "child1")

View file

@ -13,13 +13,10 @@ import com.typesafe.config.ConfigFactory
object MiscMessageSerializerSpec {
val serializationTestOverrides =
"""
|akka.actor.serialization-bindings = {
| "akka.actor.Identify" = akka-misc
| "akka.actor.ActorIdentity" = akka-misc
| "scala.Some" = akka-misc
| "scala.None$" = akka-misc
|}
""".stripMargin
akka.actor.enable-additional-serialization-bindings=on
# or they can be enabled with
# akka.remote.artery.enabled=on
"""
val testConfig = ConfigFactory.parseString(serializationTestOverrides).withFallback(AkkaSpec.testConf)
}

View file

@ -26,4 +26,8 @@ akka {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}
actor.serialization-bindings {
"akka.testkit.JavaSerializable" = java
}
}

View file

@ -0,0 +1,11 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.testkit
import java.io.Serializable
/**
* Marker trait for test messages that will use JavaSerializer.
*/
trait JavaSerializable extends Serializable