diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 7156c999bc..4a1d6012a7 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -17,41 +17,42 @@ import scala.collection.immutable.{Map, HashMap} * @author Viktor Klang */ trait Cluster { + /** - * Specifies the cluster name - */ + * Specifies the cluster name + */ def name: String /** - * Adds the specified hostname + port as a local node - * This information will be propagated to other nodes in the cluster - * and will be available at the other nodes through lookup and foreach - */ + * Adds the specified hostname + port as a local node + * This information will be propagated to other nodes in the cluster + * and will be available at the other nodes through lookup and foreach + */ def registerLocalNode(hostname: String, port: Int): Unit /** - * Removes the specified hostname + port from the local node - * This information will be propagated to other nodes in the cluster - * and will no longer be available at the other nodes through lookup and foreach - */ + * Removes the specified hostname + port from the local node + * This information will be propagated to other nodes in the cluster + * and will no longer be available at the other nodes through lookup and foreach + */ def deregisterLocalNode(hostname: String, port: Int): Unit /** - * Sends the message to all Actors of the specified type on all other nodes in the cluster - */ + * Sends the message to all Actors of the specified type on all other nodes in the cluster + */ def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit /** - * Traverses all known remote addresses avaiable at all other nodes in the cluster - * and applies the given PartialFunction on the first address that it's defined at - * The order of application is undefined and may vary - */ + * Traverses all known remote addresses avaiable at all other nodes in the cluster + * and applies the given PartialFunction on the first address that it's defined at + * The order of application is undefined and may vary + */ def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] /** - * Applies the specified function to all known remote addresses on al other nodes in the cluster - * The order of application is undefined and may vary - */ + * Applies the specified function to all known remote addresses on al other nodes in the cluster + * The order of application is undefined and may vary + */ def foreach(f: (RemoteAddress) => Unit): Unit } @@ -159,7 +160,7 @@ abstract class BasicClusterActor extends ClusterActor { case RegisterLocalNode(s) => { log debug ("RegisterLocalNode: %s", s) - local = Node(local.endpoints + s) + local = Node(s :: local.endpoints) broadcast(Papers(local.endpoints)) } @@ -242,7 +243,7 @@ object Cluster extends Cluster with Logging { "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer] - serializer setClassLoader loader + serializer.classLoader = Some(loader) try { name map { fqn => diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index 287168140a..bfeec1c34e 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -18,19 +18,17 @@ object RemoteProtocolBuilder { private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf - def setClassLoader(classLoader: ClassLoader) = { - SERIALIZER_JAVA = new Serializer.Java - SERIALIZER_JAVA_JSON = new Serializer.JavaJSON - SERIALIZER_SCALA_JSON = new Serializer.ScalaJSON - SERIALIZER_JAVA.setClassLoader(classLoader) - SERIALIZER_JAVA_JSON.setClassLoader(classLoader) - SERIALIZER_SCALA_JSON.setClassLoader(classLoader) + def setClassLoader(cl: ClassLoader) = { + SERIALIZER_JAVA.classLoader = Some(cl) + SERIALIZER_JAVA_JSON.classLoader = Some(cl) + SERIALIZER_SCALA_JSON.classLoader = Some(cl) } def getMessage(request: RemoteRequest): Any = { request.getProtocol match { case SerializationProtocol.SBINARY => - val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] + val renderer = Class.forName( + new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(request.getMessage.toByteArray) case SerializationProtocol.SCALA_JSON => val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String] diff --git a/akka-core/src/main/scala/serialization/Serializer.scala b/akka-core/src/main/scala/serialization/Serializer.scala index 1cc930a7eb..c878548711 100644 --- a/akka-core/src/main/scala/serialization/Serializer.scala +++ b/akka-core/src/main/scala/serialization/Serializer.scala @@ -18,13 +18,13 @@ import sjson.json.{Serializer => SJSONSerializer} * @author Jonas Bonér */ trait Serializer { - def deepClone(obj: AnyRef): AnyRef - def out(obj: AnyRef): Array[Byte] - def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef - - protected var classLoader: Option[ClassLoader] = None + var classLoader: Option[ClassLoader] = None - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) + def deepClone(obj: AnyRef): AnyRef + + def out(obj: AnyRef): Array[Byte] + + def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef } // For Java API @@ -55,7 +55,7 @@ object Serializer { * @author Jonas Bonér */ object Java extends Java - class Java extends Serializer { + trait Java extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = { @@ -67,8 +67,9 @@ object Serializer { } def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) - else new ObjectInputStream(new ByteArrayInputStream(bytes)) + val in = + if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) + else new ObjectInputStream(new ByteArrayInputStream(bytes)) val obj = in.readObject in.close obj @@ -79,18 +80,21 @@ object Serializer { * @author Jonas Bonér */ object Protobuf extends Protobuf - class Protobuf extends Serializer { + trait Protobuf extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def out(obj: AnyRef): Array[Byte] = { - if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]") + if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException( + "Can't serialize a non-protobuf message using protobuf [" + obj + "]") obj.asInstanceOf[Message].toByteArray } def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - if (!clazz.isDefined) throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf") + if (!clazz.isDefined) throw new IllegalArgumentException( + "Need a protobuf message class to be able to serialize bytes using protobuf") // TODO: should we cache this method lookup? - val message = clazz.get.getDeclaredMethod("getDefaultInstance", EMPTY_CLASS_ARRAY: _*).invoke(null, EMPTY_ANY_REF_ARRAY: _*).asInstanceOf[Message] + val message = clazz.get.getDeclaredMethod( + "getDefaultInstance", EMPTY_CLASS_ARRAY: _*).invoke(null, EMPTY_ANY_REF_ARRAY: _*).asInstanceOf[Message] message.toBuilder().mergeFrom(bytes).build } @@ -104,7 +108,7 @@ object Serializer { * @author Jonas Bonér */ object JavaJSON extends JavaJSON - class JavaJSON extends Serializer { + trait JavaJSON extends Serializer { private val mapper = new ObjectMapper def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) @@ -118,9 +122,11 @@ object Serializer { } def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - if (!clazz.isDefined) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided") - val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) - else new ObjectInputStream(new ByteArrayInputStream(bytes)) + if (!clazz.isDefined) throw new IllegalArgumentException( + "Can't deserialize JSON to instance if no class is provided") + val in = + if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) + else new ObjectInputStream(new ByteArrayInputStream(bytes)) val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef] in.close obj @@ -136,7 +142,7 @@ object Serializer { * @author Jonas Bonér */ object ScalaJSON extends ScalaJSON - class ScalaJSON extends Serializer { + trait ScalaJSON extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) @@ -158,7 +164,7 @@ object Serializer { * @author Jonas Bonér */ object SBinary extends SBinary - class SBinary { + trait SBinary { import sbinary.DefaultProtocol._ def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None) diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index ff37dde82e..f21490a1be 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -302,7 +302,7 @@ trait PersistentRef[T] extends Transactional with Committable { trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] with Transactional with Committable with Logging { - abstract case class QueueOp + sealed trait QueueOp case object ENQ extends QueueOp case object DEQ extends QueueOp