Refactored Serializer

This commit is contained in:
Jonas Bonér 2010-03-17 21:38:27 +01:00
parent 01ea070961
commit 8646c1f6bc
4 changed files with 54 additions and 49 deletions

View file

@ -17,41 +17,42 @@ import scala.collection.immutable.{Map, HashMap}
* @author Viktor Klang * @author Viktor Klang
*/ */
trait Cluster { trait Cluster {
/** /**
* Specifies the cluster name * Specifies the cluster name
*/ */
def name: String def name: String
/** /**
* Adds the specified hostname + port as a local node * Adds the specified hostname + port as a local node
* This information will be propagated to other nodes in the cluster * This information will be propagated to other nodes in the cluster
* and will be available at the other nodes through lookup and foreach * and will be available at the other nodes through lookup and foreach
*/ */
def registerLocalNode(hostname: String, port: Int): Unit def registerLocalNode(hostname: String, port: Int): Unit
/** /**
* Removes the specified hostname + port from the local node * Removes the specified hostname + port from the local node
* This information will be propagated to other nodes in the cluster * This information will be propagated to other nodes in the cluster
* and will no longer be available at the other nodes through lookup and foreach * and will no longer be available at the other nodes through lookup and foreach
*/ */
def deregisterLocalNode(hostname: String, port: Int): Unit def deregisterLocalNode(hostname: String, port: Int): Unit
/** /**
* Sends the message to all Actors of the specified type on all other nodes in the cluster * Sends the message to all Actors of the specified type on all other nodes in the cluster
*/ */
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
/** /**
* Traverses all known remote addresses avaiable at all other nodes in the cluster * Traverses all known remote addresses avaiable at all other nodes in the cluster
* and applies the given PartialFunction on the first address that it's defined at * and applies the given PartialFunction on the first address that it's defined at
* The order of application is undefined and may vary * The order of application is undefined and may vary
*/ */
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
/** /**
* Applies the specified function to all known remote addresses on al other nodes in the cluster * Applies the specified function to all known remote addresses on al other nodes in the cluster
* The order of application is undefined and may vary * The order of application is undefined and may vary
*/ */
def foreach(f: (RemoteAddress) => Unit): Unit def foreach(f: (RemoteAddress) => Unit): Unit
} }
@ -159,7 +160,7 @@ abstract class BasicClusterActor extends ClusterActor {
case RegisterLocalNode(s) => { case RegisterLocalNode(s) => {
log debug ("RegisterLocalNode: %s", s) log debug ("RegisterLocalNode: %s", s)
local = Node(local.endpoints + s) local = Node(s :: local.endpoints)
broadcast(Papers(local.endpoints)) broadcast(Papers(local.endpoints))
} }
@ -242,7 +243,7 @@ object Cluster extends Cluster with Logging {
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer] val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer]
serializer setClassLoader loader serializer.classLoader = Some(loader)
try { try {
name map { name map {
fqn => fqn =>

View file

@ -18,19 +18,17 @@ object RemoteProtocolBuilder {
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(classLoader: ClassLoader) = { def setClassLoader(cl: ClassLoader) = {
SERIALIZER_JAVA = new Serializer.Java SERIALIZER_JAVA.classLoader = Some(cl)
SERIALIZER_JAVA_JSON = new Serializer.JavaJSON SERIALIZER_JAVA_JSON.classLoader = Some(cl)
SERIALIZER_SCALA_JSON = new Serializer.ScalaJSON SERIALIZER_SCALA_JSON.classLoader = Some(cl)
SERIALIZER_JAVA.setClassLoader(classLoader)
SERIALIZER_JAVA_JSON.setClassLoader(classLoader)
SERIALIZER_SCALA_JSON.setClassLoader(classLoader)
} }
def getMessage(request: RemoteRequest): Any = { def getMessage(request: RemoteRequest): Any = {
request.getProtocol match { request.getProtocol match {
case SerializationProtocol.SBINARY => case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] val renderer = Class.forName(
new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray) renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON => case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String] val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]

View file

@ -18,13 +18,13 @@ import sjson.json.{Serializer => SJSONSerializer}
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
trait Serializer { trait Serializer {
def deepClone(obj: AnyRef): AnyRef var classLoader: Option[ClassLoader] = None
def out(obj: AnyRef): Array[Byte]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
protected var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) def deepClone(obj: AnyRef): AnyRef
def out(obj: AnyRef): Array[Byte]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
} }
// For Java API // For Java API
@ -55,7 +55,7 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Java extends Java object Java extends Java
class Java extends Serializer { trait Java extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = { def out(obj: AnyRef): Array[Byte] = {
@ -67,8 +67,9 @@ object Serializer {
} }
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) val in =
else new ObjectInputStream(new ByteArrayInputStream(bytes)) if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = in.readObject val obj = in.readObject
in.close in.close
obj obj
@ -79,18 +80,21 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Protobuf extends Protobuf object Protobuf extends Protobuf
class Protobuf extends Serializer { trait Protobuf extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
def out(obj: AnyRef): Array[Byte] = { def out(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]") if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(
"Can't serialize a non-protobuf message using protobuf [" + obj + "]")
obj.asInstanceOf[Message].toByteArray obj.asInstanceOf[Message].toByteArray
} }
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf") if (!clazz.isDefined) throw new IllegalArgumentException(
"Need a protobuf message class to be able to serialize bytes using protobuf")
// TODO: should we cache this method lookup? // TODO: should we cache this method lookup?
val message = clazz.get.getDeclaredMethod("getDefaultInstance", EMPTY_CLASS_ARRAY: _*).invoke(null, EMPTY_ANY_REF_ARRAY: _*).asInstanceOf[Message] val message = clazz.get.getDeclaredMethod(
"getDefaultInstance", EMPTY_CLASS_ARRAY: _*).invoke(null, EMPTY_ANY_REF_ARRAY: _*).asInstanceOf[Message]
message.toBuilder().mergeFrom(bytes).build message.toBuilder().mergeFrom(bytes).build
} }
@ -104,7 +108,7 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object JavaJSON extends JavaJSON object JavaJSON extends JavaJSON
class JavaJSON extends Serializer { trait JavaJSON extends Serializer {
private val mapper = new ObjectMapper private val mapper = new ObjectMapper
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
@ -118,9 +122,11 @@ object Serializer {
} }
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided") if (!clazz.isDefined) throw new IllegalArgumentException(
val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) "Can't deserialize JSON to instance if no class is provided")
else new ObjectInputStream(new ByteArrayInputStream(bytes)) val in =
if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef] val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef]
in.close in.close
obj obj
@ -136,7 +142,7 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object ScalaJSON extends ScalaJSON object ScalaJSON extends ScalaJSON
class ScalaJSON extends Serializer { trait ScalaJSON extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
@ -158,7 +164,7 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object SBinary extends SBinary object SBinary extends SBinary
class SBinary { trait SBinary {
import sbinary.DefaultProtocol._ import sbinary.DefaultProtocol._
def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None) def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None)

View file

@ -302,7 +302,7 @@ trait PersistentRef[T] extends Transactional with Committable {
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
with Transactional with Committable with Logging { with Transactional with Committable with Logging {
abstract case class QueueOp sealed trait QueueOp
case object ENQ extends QueueOp case object ENQ extends QueueOp
case object DEQ extends QueueOp case object DEQ extends QueueOp