Configure serializer with class as key. See #1789

This commit is contained in:
Patrik Nordwall 2012-02-06 21:12:26 +01:00
parent d7435547ff
commit 1dbce49359
9 changed files with 207 additions and 245 deletions

View file

@ -8,14 +8,21 @@ import akka.AkkaException
import akka.util.ReflectiveAccess
import scala.util.DynamicVariable
import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address }
import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging
import scala.collection.mutable.ArrayBuffer
import java.io.NotSerializableException
case class NoSerializerFoundException(m: String) extends AkkaException(m)
object Serialization {
/**
* Tuple that represents mapping from Class to Serializer
*/
type ClassSerializer = (Class[_], Serializer)
/**
* This holds a reference to the current ActorSystem (the surrounding context)
* during serialization and deserialization.
@ -40,28 +47,19 @@ object Serialization {
import scala.collection.JavaConverters._
import config._
val Serializers: Map[String, String] =
getConfig("akka.actor.serializers").root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
val Serializers: Map[String, String] = configToMap(getConfig("akka.actor.serializers"))
val SerializationBindings: Map[String, Seq[String]] = {
val configPath = "akka.actor.serialization-bindings"
hasPath(configPath) match {
case false Map()
case true
val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).root.unwrapped.asScala.toMap.map {
case (k: String, v: java.util.Collection[_]) (k -> v.asScala.toSeq.asInstanceOf[Seq[String]])
case invalid throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid))
}
serializationBindings
val SerializationBindings: Map[String, String] = configToMap(getConfig("akka.actor.serialization-bindings"))
private def configToMap(cfg: Config): Map[String, String] =
cfg.root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
}
}
}
}
/**
* Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
* locating a Serializer for a particular class as defined in the mapping in the configuration.
*/
class Serialization(val system: ExtendedActorSystem) extends Extension {
import Serialization._
@ -105,8 +103,10 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
} catch { case e: Exception Left(e) }
/**
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null,
* falls back to the Serializer named "default"
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null.
*
* @throws akka.config.ConfigurationException if no `serialization-bindings` is configured for the
* class of the object
*/
def findSerializerFor(o: AnyRef): Serializer = o match {
case null NullSerializer
@ -114,82 +114,86 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
}
/**
* Returns the configured Serializer for the given Class, falls back to the Serializer named "default".
* It traverses interfaces and super classes to find any configured Serializer that match
* the class name.
* Returns the configured Serializer for the given Class. The configured Serializer
* is used if the configured class `isAssignableFrom` from the `clazz`, i.e.
* the configured class is a super class or implemented interface. In case of
* ambiguity it is primarily using the most specific configured class,
* and secondly the entry configured first.
*
* @throws java.io.NotSerializableException if no `serialization-bindings` is configured for the class
*/
def serializerFor(clazz: Class[_]): Serializer =
if (bindings.isEmpty) {
// quick path to default when no bindings are registered
serializers("default")
} else {
def resolve(c: Class[_]): Option[Serializer] =
serializerMap.get(c.getName) match {
case null
val classes = c.getInterfaces ++ Option(c.getSuperclass)
classes.view map resolve collectFirst { case Some(x) x }
case x Some(x)
serializerMap.get(clazz) match {
case null
val ser = bindings.find { case (c, s) c.isAssignableFrom(clazz) } match {
case None throw new NotSerializableException(
"No configured serialization-bindings for class [%s]" format clazz.getName)
case Some((c, s)) s
}
serializerMap.get(clazz.getName) match {
case null
val ser = resolve(clazz).getOrElse(serializers("default"))
// memorize the lookups for performance
serializerMap.putIfAbsent(clazz.getName, ser) match {
case null
log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName)
ser
case some some
}
case ser ser
}
// memorize for performance
serializerMap.putIfAbsent(clazz, ser) match {
case null
log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName)
ser
case some some
}
case ser ser
}
/**
* Tries to load the specified Serializer by the FQN
* Tries to instantiate the specified Serializer by the FQN
*/
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs, system.internalClassLoader)
/**
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer
* But "default" can be overridden in config
* By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer
*/
lazy val serializers: Map[String, Serializer] = {
val serializersConf = settings.Serializers
for ((k: String, v: String) serializersConf)
private val serializers: Map[String, Serializer] = {
for ((k: String, v: String) settings.Serializers)
yield k -> serializerOf(v).fold(throw _, identity)
}
/**
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
* bindings is a Seq of tuple representing the mapping from Class to Serializer.
* It is primarily ordered by the most specific classes first, and secondly in the configured order.
*/
lazy val bindings: Map[String, String] = {
settings.SerializationBindings.foldLeft(Map[String, String]()) {
//All keys which are lists, take the Strings from them and Map them
case (result, (k: String, vs: Seq[_])) result ++ (vs collect { case v: String (v, k) })
//For any other values, just skip them
case (result, _) result
private[akka] val bindings: Seq[ClassSerializer] = {
val configuredBindings = for ((k: String, v: String) settings.SerializationBindings) yield {
val c = ReflectiveAccess.getClassFor(k, system.internalClassLoader).fold(throw _, (c: Class[_]) c)
(c, serializers(v))
}
sort(configuredBindings)
}
/**
* serializerMap is a Map whose keys = FQN of class that is serializable and values is the serializer to be used for that class
* Sort so that subtypes always precede their supertypes, but without
* obeying any order between unrelated subtypes (insert sort).
*/
private lazy val serializerMap: ConcurrentHashMap[String, Serializer] = {
val serializerMap = new ConcurrentHashMap[String, Serializer]
for ((k, v) bindings) {
serializerMap.put(k, serializers(v))
private def sort(in: Iterable[ClassSerializer]): Seq[ClassSerializer] =
(new ArrayBuffer[ClassSerializer](in.size) /: in) { (buf, ca)
buf.indexWhere(_._1 isAssignableFrom ca._1) match {
case -1 buf append ca
case x buf insert (x, ca)
}
buf
}
/**
* serializerMap is a Map whose keys is the class that is serializable and values is the serializer
* to be used for that class.
*/
private val serializerMap: ConcurrentHashMap[Class[_], Serializer] = {
val serializerMap = new ConcurrentHashMap[Class[_], Serializer]
for ((c, s) bindings) serializerMap.put(c, s)
serializerMap
}
/**
* Maps from a Serializer Identity (Int) to a Serializer instance (optimization)
*/
lazy val serializerByIdentity: Map[Int, Serializer] =
val serializerByIdentity: Map[Int, Serializer] =
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
}