diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 32f2ede8bf..df2170905e 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -15,24 +15,6 @@ import scala.reflect.BeanInfo import com.google.protobuf.Message import akka.pattern.ask -class ProtobufSerializer extends Serializer { - val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) - def includeManifest: Boolean = true - def identifier = 2 - - def toBinary(obj: AnyRef): Array[Byte] = { - if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException( - "Can't serialize a non-protobuf message using protobuf [" + obj + "]") - obj.asInstanceOf[Message].toByteArray - } - - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = { - if (!clazz.isDefined) throw new IllegalArgumentException( - "Need a protobuf message class to be able to serialize bytes using protobuf") - clazz.get.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message] - } -} - object SerializeSpec { val serializationConf = ConfigFactory.parseString(""" @@ -40,11 +22,12 @@ object SerializeSpec { actor { serializers { java = "akka.serialization.JavaSerializer" + test = "akka.serialization.TestSerializer" } - + serialization-bindings { java = ["akka.serialization.SerializeSpec$Person", "akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"] + test = ["akka.serialization.TestSerializble", "akka.serialization.SerializeSpec$PlainMessage"] } } } @@ -56,6 +39,21 @@ object SerializeSpec { case class Person(name: String, age: Int, address: Address) { def this() = this("", 0, null) } case class Record(id: Int, person: Person) + + class SimpleMessage(s: String) extends TestSerializble + + class ExtendedSimpleMessage(s: String, i: Int) extends SimpleMessage(s) + + trait AnotherInterface extends TestSerializble + + class AnotherMessage extends AnotherInterface + + class ExtendedAnotherMessage extends AnotherMessage + + class PlainMessage + + class ExtendedPlainMessage extends PlainMessage + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -72,7 +70,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { "have correct bindings" in { ser.bindings(addr.getClass.getName) must be("java") - ser.bindings("akka.actor.ProtobufProtocol$MyMessage") must be("proto") + ser.bindings(classOf[PlainMessage].getName) must be("test") } "serialize Address" in { @@ -145,6 +143,37 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { a.shutdown() } } + + "resove serializer by direct interface" in { + val msg = new SimpleMessage("foo") + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer by interface implemented by super class" in { + val msg = new ExtendedSimpleMessage("foo", 17) + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer by indirect interface" in { + val msg = new AnotherMessage + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer by indirect interface implemented by super class" in { + val msg = new ExtendedAnotherMessage + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer for message with binding" in { + val msg = new PlainMessage + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer for message extending class with with binding" in { + val msg = new ExtendedPlainMessage + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + } } @@ -158,13 +187,11 @@ object VerifySerializabilitySpec { serializers { java = "akka.serialization.JavaSerializer" - proto = "akka.serialization.ProtobufSerializer" default = "akka.serialization.JavaSerializer" } serialization-bindings { java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"] } } } @@ -209,3 +236,20 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) system stop a } } + +trait TestSerializble + +class TestSerializer extends Serializer { + def includeManifest: Boolean = false + + def identifier = 9999 + + def toBinary(o: AnyRef): Array[Byte] = { + Array.empty[Byte] + } + + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, + classLoader: Option[ClassLoader] = None): AnyRef = { + null + } +} diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 750b2e5c35..e89adde8fb 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -10,6 +10,8 @@ import scala.util.DynamicVariable import com.typesafe.config.Config import akka.config.ConfigurationException import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address } +import java.util.concurrent.ConcurrentHashMap +import akka.event.Logging case class NoSerializerFoundException(m: String) extends AkkaException(m) @@ -65,6 +67,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { import Serialization._ val settings = new Settings(system.settings.config) + val log = Logging(system, getClass.getName) /** * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration @@ -111,10 +114,37 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { } /** - * Returns the configured Serializer for the given Class, falls back to the Serializer named "default" + * Returns the configured Serializer for the given Class, falls back to the Serializer named "default". + * It traverses interfaces and super classes to find any configured Serializer that match + * the class name. */ - def serializerFor(clazz: Class[_]): Serializer = //TODO fall back on BestMatchClass THEN default AND memoize the lookups - serializerMap.get(clazz.getName).getOrElse(serializers("default")) + def serializerFor(clazz: Class[_]): Serializer = + if (bindings.isEmpty) { + // quick path to default when no bindings are registered + serializers("default") + } else { + + def resolve(c: Class[_]): Option[Serializer] = + serializerMap.get(c.getName) match { + case null ⇒ + val classes = c.getInterfaces ++ Option(c.getSuperclass) + classes.view map resolve collectFirst { case Some(x) ⇒ x } + case x ⇒ Some(x) + } + + serializerMap.get(clazz.getName) match { + case null ⇒ + val ser = resolve(clazz).getOrElse(serializers("default")) + // memorize the lookups for performance + serializerMap.putIfAbsent(clazz.getName, ser) match { + case null ⇒ + log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) + ser + case some ⇒ some + } + case ser ⇒ ser + } + } /** * Tries to load the specified Serializer by the FQN @@ -146,9 +176,15 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { } /** - * serializerMap is a Map whose keys = FQN of class that is serializable and values = the FQN of the serializer to be used for that class + * serializerMap is a Map whose keys = FQN of class that is serializable and values is the serializer to be used for that class */ - lazy val serializerMap: Map[String, Serializer] = bindings mapValues serializers + private lazy val serializerMap: ConcurrentHashMap[String, Serializer] = { + val serializerMap = new ConcurrentHashMap[String, Serializer] + for ((k, v) ← bindings) { + serializerMap.put(k, serializers(v)) + } + serializerMap + } /** * Maps from a Serializer Identity (Int) to a Serializer instance (optimization) diff --git a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java index 1b917c8b5b..b68f8f2e79 100644 --- a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java @@ -91,14 +91,17 @@ public class SerializationDocTestBase { serializers { default = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer" + proto = "akka.serialization.ProtobufSerializer" myown = "akka.docs.serialization.MyOwnSerializer" } serialization-bindings { java = ["java.lang.String", "app.my.Customer"] + proto = ["com.google.protobuf.Message"] myown = ["my.own.BusinessObject", "something.equally.Awesome", + "akka.docs.serialization.MyOwnSerializable" "java.lang.Boolean"] } } diff --git a/akka-docs/java/serialization.rst b/akka-docs/java/serialization.rst index 7140b42aac..2920538ded 100644 --- a/akka-docs/java/serialization.rst +++ b/akka-docs/java/serialization.rst @@ -39,8 +39,26 @@ should be serialized using which ``Serializer``, this is done in the "akka.actor .. note:: - Akka currently only checks for absolute equality of Classes, i.e. it does not yet check ``isAssignableFrom``, - this means that you'll need to list the specific classes. + You only need to specify the name of an interface or abstract base class if the messages implements + that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. + +Protobuf +-------- + +Akka provides a ``Serializer`` for `protobuf `_ messages. +To use that you need to add the following to the configuration:: + + akka { + actor { + serializers { + proto = "akka.serialization.ProtobufSerializer" + } + + serialization-bindings { + proto = ["com.google.protobuf.Message"] + } + } + } Verification ------------ @@ -74,6 +92,7 @@ here's some examples: For more information, have a look at the ``ScalaDoc`` for ``akka.serialization._`` + Customization ============= diff --git a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala index 6baba425bc..7f1553f75c 100644 --- a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala @@ -104,14 +104,17 @@ class SerializationDocSpec extends AkkaSpec { serializers { default = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer" + proto = "akka.serialization.ProtobufSerializer" myown = "akka.docs.serialization.MyOwnSerializer" } serialization-bindings { java = ["java.lang.String", "app.my.Customer"] + proto = ["com.google.protobuf.Message"] myown = ["my.own.BusinessObject", "something.equally.Awesome", + "akka.docs.serialization.MyOwnSerializable" "java.lang.Boolean"] } } diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst index 15879b2ce4..6a0867dea2 100644 --- a/akka-docs/scala/serialization.rst +++ b/akka-docs/scala/serialization.rst @@ -39,8 +39,26 @@ should be serialized using which ``Serializer``, this is done in the "akka.actor .. note:: - Akka currently only checks for absolute equality of Classes, i.e. it does not yet check ``isAssignableFrom``, - this means that you'll need to list the specific classes. + You only need to specify the name of an interface or abstract base class if the messages implements + that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. + +Protobuf +-------- + +Akka provides a ``Serializer`` for `protobuf `_ messages. +To use that you need to add the following to the configuration:: + + akka { + actor { + serializers { + proto = "akka.serialization.ProtobufSerializer" + } + + serialization-bindings { + proto = ["com.google.protobuf.Message"] + } + } + } Verification ------------