pekko/akka-actor/src/main/scala/akka/serialization/Serialization.scala

196 lines
7.3 KiB
Scala
Raw Normal View History

/**
2012-01-19 18:21:06 +01:00
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.AkkaException
import akka.util.ReflectiveAccess
import scala.util.DynamicVariable
import com.typesafe.config.Config
2011-11-24 18:53:18 +01:00
import akka.config.ConfigurationException
import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address }
import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging
case class NoSerializerFoundException(m: String) extends AkkaException(m)
2011-11-24 18:53:18 +01:00
object Serialization {
/**
* This holds a reference to the current ActorSystem (the surrounding context)
* during serialization and deserialization.
*
* If you are using Serializers yourself, outside of SerializationExtension,
* you'll need to surround the serialization/deserialization with:
*
* currentSystem.withValue(system) {
* ...code...
* }
*/
val currentSystem = new DynamicVariable[ActorSystem](null)
2011-11-24 18:53:18 +01:00
/**
* This holds a reference to the current transport address to be inserted
* into local actor refs during serialization.
*/
val currentTransportAddress = new DynamicVariable[Address](null)
class Settings(val config: Config) {
2011-11-24 18:53:18 +01:00
import scala.collection.JavaConverters._
import config._
2011-12-29 16:11:56 +01:00
val Serializers: Map[String, String] =
getConfig("akka.actor.serializers").root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
2011-11-24 18:53:18 +01:00
val SerializationBindings: Map[String, Seq[String]] = {
val configPath = "akka.actor.serialization-bindings"
hasPath(configPath) match {
case false Map()
case true
2011-12-02 08:51:51 +01:00
val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).root.unwrapped.asScala.toMap.map {
2011-11-24 18:53:18 +01:00
case (k: String, v: java.util.Collection[_]) (k -> v.asScala.toSeq.asInstanceOf[Seq[String]])
case invalid throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid))
}
serializationBindings
}
}
}
}
/**
* Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
*/
class Serialization(val system: ExtendedActorSystem) extends Extension {
2011-11-24 18:53:18 +01:00
import Serialization._
val settings = new Settings(system.settings.config)
val log = Logging(system, getClass.getName)
2011-12-29 16:11:56 +01:00
/**
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
* to either an Array of Bytes or an Exception if one was thrown.
*/
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
2011-07-26 18:33:59 +12:00
try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception Left(e) }
2011-12-29 16:11:56 +01:00
/**
* Deserializes the given array of bytes using the specified serializer id,
* using the optional type hint to the Serializer and the optional ClassLoader ot load it into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(bytes: Array[Byte],
serializerId: Int,
2011-12-29 16:11:56 +01:00
clazz: Option[Class[_]],
2012-01-27 13:30:43 +01:00
classLoader: ClassLoader): Either[Exception, AnyRef] =
2011-12-29 16:11:56 +01:00
try {
currentSystem.withValue(system) {
2012-01-27 13:30:43 +01:00
Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, Some(classLoader)))
2011-12-29 16:11:56 +01:00
}
} catch { case e: Exception Left(e) }
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* You can specify an optional ClassLoader to load the object into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
try {
2011-12-29 16:11:56 +01:00
currentSystem.withValue(system) { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) }
} catch { case e: Exception Left(e) }
2011-12-29 16:11:56 +01:00
/**
2011-12-30 15:09:34 +01:00
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null,
* falls back to the Serializer named "default"
2011-12-29 16:11:56 +01:00
*/
def findSerializerFor(o: AnyRef): Serializer = o match {
2011-07-26 18:33:59 +12:00
case null NullSerializer
case other serializerFor(other.getClass)
}
2011-12-29 16:11:56 +01:00
/**
* Returns the configured Serializer for the given Class, falls back to the Serializer named "default".
* It traverses interfaces and super classes to find any configured Serializer that match
* the class name.
2011-12-29 16:11:56 +01:00
*/
2012-02-04 17:35:39 +01:00
def serializerFor(clazz: Class[_]): Serializer =
if (bindings.isEmpty) {
// quick path to default when no bindings are registered
serializers("default")
} else {
2012-02-04 17:35:39 +01:00
def resolve(c: Class[_]): Option[Serializer] =
serializerMap.get(c.getName) match {
case null
val classes = c.getInterfaces ++ Option(c.getSuperclass)
classes.view map resolve collectFirst { case Some(x) x }
case x Some(x)
}
2012-02-03 21:42:33 +01:00
serializerMap.get(clazz.getName) match {
case null
val ser = resolve(clazz).getOrElse(serializers("default"))
// memorize the lookups for performance
2012-02-03 21:42:33 +01:00
serializerMap.putIfAbsent(clazz.getName, ser) match {
case null
log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName)
ser
case some some
}
case ser ser
}
}
/**
* Tries to load the specified Serializer by the FQN
*/
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs, system.internalClassLoader)
/**
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer
* But "default" can be overridden in config
*/
lazy val serializers: Map[String, Serializer] = {
2011-11-24 18:53:18 +01:00
val serializersConf = settings.Serializers
for ((k: String, v: String) serializersConf)
yield k -> serializerOf(v).fold(throw _, identity)
}
2011-06-29 21:25:17 +02:00
/**
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
*/
lazy val bindings: Map[String, String] = {
settings.SerializationBindings.foldLeft(Map[String, String]()) {
//All keys which are lists, take the Strings from them and Map them
case (result, (k: String, vs: Seq[_])) result ++ (vs collect { case v: String (v, k) })
//For any other values, just skip them
case (result, _) result
}
}
2011-06-29 21:25:17 +02:00
/**
* serializerMap is a Map whose keys = FQN of class that is serializable and values is the serializer to be used for that class
*/
private lazy val serializerMap: ConcurrentHashMap[String, Serializer] = {
val serializerMap = new ConcurrentHashMap[String, Serializer]
2012-02-03 21:42:33 +01:00
for ((k, v) bindings) {
serializerMap.put(k, serializers(v))
}
serializerMap
}
/**
* Maps from a Serializer Identity (Int) to a Serializer instance (optimization)
*/
lazy val serializerByIdentity: Map[Int, Serializer] =
2011-07-26 18:33:59 +12:00
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
}