diff --git a/.gitignore b/.gitignore index a11c2f1e00..451f208d2b 100755 --- a/.gitignore +++ b/.gitignore @@ -50,4 +50,6 @@ akka-docs/_build/ akka-tutorials/akka-tutorial-first/project/boot/ akka-tutorials/akka-tutorial-first/project/plugins/project/ akka-docs/exts/ -_akka_cluster/ \ No newline at end of file +_akka_cluster/ +Makefile +akka.sublime-project \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index b321d3eade..c9750ff912 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -14,7 +14,14 @@ class DeployerSpec extends WordSpec with MustMatchers { "be able to parse 'akka.actor.deployment._' config elements" in { val deployment = Deployer.lookupInConfig("service-pi") deployment must be ('defined) - deployment must equal (Some(Deploy("service-pi", RoundRobin, Clustered(Node("test-1"), Replicate(3), Stateless)))) + deployment must equal (Some(Deploy( + "service-pi", + RoundRobin, + "akka.serializer.Format$Default$", + Clustered( + Node("test-1"), + Replicate(3), + Stateless)))) } } } diff --git a/akka-actor/src/main/java/akka/actor/Actors.java b/akka-actor/src/main/java/akka/actor/Actors.java index a5ec9f37dc..3ef7bc254b 100644 --- a/akka-actor/src/main/java/akka/actor/Actors.java +++ b/akka-actor/src/main/java/akka/actor/Actors.java @@ -3,6 +3,8 @@ package akka.actor; import akka.japi.Creator; import akka.remoteinterface.RemoteSupport; +import com.eaio.uuid.UUID; + /** * JAVA API for * - creating actors, @@ -28,6 +30,30 @@ public class Actors { return Actor$.MODULE$.remote(); } + /** + * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the + * UntypedActor instance directly, but only through its 'ActorRef' wrapper reference. + *

+ * Creates an ActorRef out of the Actor. Allows you to pass in the instance for the UntypedActor. + * Only use this method when you need to pass in constructor arguments into the 'UntypedActor'. + *

+ * You use it by implementing the UntypedActorFactory interface. + * Example in Java: + *

+   *   ActorRef actor = Actors.actorOf(new UntypedActorFactory() {
+   *     public UntypedActor create() {
+   *       return new MyUntypedActor("service:name", 5);
+   *     }
+   *   }, "my-actor-address");
+   *   actor.start();
+   *   actor.sendOneWay(message, context);
+   *   actor.stop();
+   * 
+ */ + public static ActorRef actorOf(final Creator factory, final String address) { + return Actor$.MODULE$.actorOf(factory, address); + } + /** * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the * UntypedActor instance directly, but only through its 'ActorRef' wrapper reference. @@ -49,14 +75,32 @@ public class Actors { * */ public static ActorRef actorOf(final Creator factory) { - return Actor$.MODULE$.actorOf(factory); + return Actor$.MODULE$.actorOf(factory, new UUID().toString()); } /** * Creates an ActorRef out of the Actor type represented by the class provided. * Example in Java: *
-   *   ActorRef actor = Actors.actorOf(MyUntypedActor.class);
+   *   ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
+   *   actor.start();
+   *   actor.sendOneWay(message, context);
+   *   actor.stop();
+   * 
+ * You can create and start the actor in one statement like this: + *
+   *   val actor = Actors.actorOf(MyActor.class, "my-actor-address").start();
+   * 
+ */ + public static ActorRef actorOf(final Class type, final String address) { + return Actor$.MODULE$.actorOf(type, address); + } + + /** + * Creates an ActorRef out of the Actor type represented by the class provided. + * Example in Java: + *
+   *   ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
    *   actor.start();
    *   actor.sendOneWay(message, context);
    *   actor.stop();
@@ -67,7 +111,7 @@ public class Actors {
    * 
*/ public static ActorRef actorOf(final Class type) { - return Actor$.MODULE$.actorOf(type); + return Actor$.MODULE$.actorOf(type, new UUID().toString()); } /** diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 7a08a8d48e..90ecd737ef 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -11,11 +11,12 @@ import Helpers.{narrow, narrowSilently} import akka.remoteinterface.RemoteSupport import akka.japi.{Creator, Procedure} import akka.AkkaException +import akka.serialization._ +import akka.event.EventHandler import scala.reflect.BeanProperty import com.eaio.uuid.UUID -import akka.event.EventHandler /** * Life-cycle messages for the Actors @@ -145,8 +146,8 @@ object Actor extends ListenerManagement { * val actor = actorOf[MyActor].start() * */ - def actorOf[T <: Actor : Manifest](address: String): ActorRef = - actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address) + def actorOf[T <: Actor : Manifest](address: String): ActorRef = + actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address) /** * Creates an ActorRef out of the Actor with type T. @@ -163,8 +164,8 @@ object Actor extends ListenerManagement { * val actor = actorOf[MyActor].start * */ - def actorOf[T <: Actor : Manifest]: ActorRef = - actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString) + def actorOf[T <: Actor : Manifest]: ActorRef = + actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString) /** * Creates an ActorRef out of the Actor of the specified Class. @@ -181,7 +182,8 @@ object Actor extends ListenerManagement { * val actor = actorOf(classOf[MyActor]).start() * */ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = actorOf(clazz, new UUID().toString) + def actorOf[T <: Actor](clazz: Class[T]): ActorRef = + actorOf(clazz, new UUID().toString) /** * Creates an ActorRef out of the Actor of the specified Class. @@ -197,28 +199,63 @@ object Actor extends ListenerManagement { * val actor = actorOf(classOf[MyActor]).start * */ - def actorOf(clazz: Class[_ <: Actor], address: String): ActorRef = { + def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = { import DeploymentConfig._ + import ReflectiveAccess._ Address.validate(address) try { Deployer.deploymentFor(address) match { - case Deploy(_, router, Local) => + case Deploy(_, router, _, Local) => // FIXME handle 'router' in 'Local' actors newLocalActorRef(clazz, address) - case Deploy(_, router, Clustered(home, replication , state)) => - sys.error("Clustered deployment not yet supported") - /* + case Deploy(_, router, formatClassName, Clustered(home, replication, state)) => + ClusterModule.ensureEnabled() if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") + + val hostname = home match { + case Host(hostname) => hostname + case IP(address) => address + case Node(nodeName) => "localhost" // FIXME lookup hostname for node name + } + + val replicas = replication match { + case Replicate(replicas) => replicas + case AutoReplicate => -1 + case NoReplicas => 0 + } + + import ClusterModule.node + if (hostname == RemoteModule.remoteServerHostname) { // home node for clustered actor + + def formatErrorDueTo(reason: String) = { + throw new akka.config.ConfigurationException( + "Could not create Format[T] object [" + formatClassName + + "] for serialization of actor [" + address + + "] since " + reason) + } + + implicit val format: Format[T] = { + if (formatClassName == "N/A") formatErrorDueTo("no class name defined in configuration") + val f = ReflectiveAccess.getObjectFor(formatClassName).getOrElse(formatErrorDueTo("it could not be loaded")) + if (f.isInstanceOf[Format[T]]) f.asInstanceOf[Format[T]] + else formatErrorDueTo("class must be of type [akka.serialization.Format[T]]") + } + + if (!node.isClustered(address)) node.store(address, clazz, replicas, false) + node.use(address) + } else { + //val router = + node.ref(address, null) + } + sys.error("Clustered deployment not yet supported") + + /* val remoteAddress = Actor.remote.address if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) { // home node for actor - if (!node.isClustered(address)) node.store(clazz, address) - node.use(address).head } else { - val router = - node.ref(address, router) } */ /* @@ -231,9 +268,6 @@ object Actor extends ListenerManagement { Misc stuff: - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? - - Deployer should: - 1. Check if deployment exists in ZK - 2. If not, upload it - ClusterNode API and Actor.remote API should be made private[akka] - Rewrite ClusterSpec or remove it - Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor @@ -275,7 +309,7 @@ object Actor extends ListenerManagement { * val actor = actorOf(new MyActor).start() * */ - def actorOf(factory: => Actor): ActorRef = actorOf(factory, new UUID().toString) + def actorOf[T <: Actor](factory: => T): ActorRef = actorOf(factory, new UUID().toString) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory function @@ -295,7 +329,7 @@ object Actor extends ListenerManagement { * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor, address: String): ActorRef = { + def actorOf[T <: Actor](factory: => T, address: String): ActorRef = { Address.validate(address) new LocalActorRef(() => factory, address) } @@ -309,7 +343,8 @@ object Actor extends ListenerManagement { * This function should NOT be used for remote actors. * JAVA API */ - def actorOf(creator: Creator[Actor]): ActorRef = actorOf(creator, new UUID().toString) + def actorOf[T <: Actor](creator: Creator[T]): ActorRef = + actorOf(creator, new UUID().toString) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) @@ -319,7 +354,7 @@ object Actor extends ListenerManagement { * This function should NOT be used for remote actors. * JAVA API */ - def actorOf(creator: Creator[Actor], address: String): ActorRef = { + def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = { Address.validate(address) new LocalActorRef(() => creator.create, address) } @@ -332,7 +367,7 @@ object Actor extends ListenerManagement { * there is a method 'spawn[ActorType]' in the Actor trait already. * Example: *
-   * import Actor.{spawn}
+   * import Actor.spawn
    *
    * spawn  {
    *   ... // do stuff
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 9e802e48f3..b36d90aa5b 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -967,7 +967,7 @@ private[akka] case class RemoteActorRef private[akka] (
 
   import DeploymentConfig._
   val remoteAddress = Deployer.deploymentFor(address) match {
-    case Deploy(_, _, Clustered(home, _, _)) =>
+    case Deploy(_, _, _, Clustered(home, _, _)) =>
       val hostname = home match {
         case Host(hostname) => hostname
         case IP(address)    => address
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
index 4d94b8024f..335f331902 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
@@ -13,6 +13,7 @@ import java.util.{Set => JSet}
 
 import akka.util.ReflectiveAccess._
 import akka.util.{ReflectiveAccess, ReadWriteGuard, ListenerManagement}
+import akka.serialization._
 
 /**
  * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@@ -101,8 +102,9 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
   /**
    *  Registers an actor in the Cluster ActorRegistry.
    */
-  private[akka] def registerInCluster(address: String, actor: ActorRef) {
-    ClusterModule.node.store(address, actor)
+  private[akka] def registerInCluster[T <: Actor](
+    address: String, actor: ActorRef, replicas: Int, serializeMailbox: Boolean = false)(implicit format: Format[T]) {
+    ClusterModule.node.store(address, actor, replicas, serializeMailbox)
   }
 
   /**
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index 177c943169..7ff3659919 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -16,40 +16,6 @@ import akka.AkkaException
 
 /**
  * Programatic deployment configuration classes. Most values have defaults and can be left out.
- * 

- * Example Scala API: - *

- *   import akka.actor.DeploymentConfig._
- *
- *   val deploymentHello = Deploy("service:hello", Local)
- *
- *   val deploymentE     = Deploy("service:e", AutoReplicate, Clustered(Host("darkstar.lan"), Stateful))
- *
- *   val deploymentPi1   = Deploy("service:pi", Replicate(3), Clustered(Host("darkstar.lan"), Stateless(RoundRobin)))
- *
- *   // same thing as 'deploymentPi1' but more explicit
- *   val deploymentPi2   =
- *     Deploy(
- *       address = "service:pi",
- *         replicas = 3,
- *         scope = Clustered(
- *           home = Host("darkstar.lan")
- *           state = Stateless(
- *             routing = RoundRobin
- *           )
- *         )
- *       )
- * 
- * Example Java API: - *
- *   import static akka.actor.*;
- *
- *   val deploymentHello = new Deploy("service:hello", new Local());
- *
- *   val deploymentE     = new Deploy("service:e", new AutoReplicate(), new Clustered(new Host("darkstar.lan"), new Stateful()));
- *
- *   val deploymentPi1   = new Deploy("service:pi", new Replicate(3), new Clustered(new Host("darkstar.lan"), new Stateless(new RoundRobin())))
- * 
* * @author Jonas Bonér */ @@ -58,7 +24,7 @@ object DeploymentConfig { // -------------------------------- // --- Deploy // -------------------------------- - case class Deploy(address: String, routing: Routing = Direct, scope: Scope = Local) + case class Deploy(address: String, routing: Routing = Direct, format: String = "N/A", scope: Scope = Local) // -------------------------------- // --- Routing @@ -179,7 +145,7 @@ object Deployer { } def isLocal(deployment: Deploy): Boolean = deployment match { - case Deploy(_, _, Local) => true + case Deploy(_, _, _, Local) => true case _ => false } @@ -251,7 +217,7 @@ object Deployer { // -------------------------------- val addressPath = "akka.actor.deployment." + address Config.config.getSection(addressPath) match { - case None => Some(Deploy(address, Direct, Local)) + case None => Some(Deploy(address, Direct, "N/A", Local)) case Some(addressConfig) => // -------------------------------- @@ -275,12 +241,17 @@ object Deployer { CustomRouter(customRouter) } + // -------------------------------- + // akka.actor.deployment.
.format + // -------------------------------- + val format = addressConfig.getString("format", "N/A") + // -------------------------------- // akka.actor.deployment.
.clustered // -------------------------------- addressConfig.getSection("clustered") match { case None => - Some(Deploy(address, router, Local)) // deploy locally + Some(Deploy(address, router, "N/A", Local)) // deploy locally case Some(clusteredConfig) => @@ -334,7 +305,7 @@ object Deployer { if (clusteredConfig.getBool("stateless", false)) Stateless else Stateful - Some(Deploy(address, router, Clustered(home, replicas, state))) + Some(Deploy(address, router, format, Clustered(home, replicas, state))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Format.scala b/akka-actor/src/main/scala/akka/actor/Format.scala new file mode 100644 index 0000000000..f61ad3b0ae --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Format.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.serialization + +import akka.actor.Actor + +import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream} + +/** + * Type class definition for Actor Serialization + */ +trait FromBinary[T <: Actor] { + def fromBinary(bytes: Array[Byte], act: T): T +} + +trait ToBinary[T <: Actor] { + def toBinary(t: T): Array[Byte] +} + +// client needs to implement Format[] for the respective actor +trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] + +/** + * @author Jonas Bonér + */ +trait Serializer extends scala.Serializable { + @volatile var classLoader: Option[ClassLoader] = None + def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) + + def toBinary(obj: AnyRef): Array[Byte] + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef +} + +/** + * A default implementation for a stateless actor + * + * Create a Format object with the client actor as the implementation of the type class + * + *
+ * object BinaryFormatMyStatelessActor  {
+ *   implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
+ * }
+ * 
+ */ +trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable { + def fromBinary(bytes: Array[Byte], act: T) = act + + def toBinary(ac: T) = Array.empty[Byte] +} + +/** + * A default implementation of the type class for a Format that specifies a serializer + * + * Create a Format object with the client actor as the implementation of the type class and + * a serializer object + * + *
+ * object BinaryFormatMyJavaSerializableActor  {
+ *   implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor]  {
+ *     val serializer = Serializers.Java
+ * }
+ * }
+ * 
+ */ +trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable { + val serializer: Serializer + + def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T] + + def toBinary(ac: T) = serializer.toBinary(ac) +} diff --git a/akka-actor/src/main/scala/akka/actor/Routing.scala b/akka-actor/src/main/scala/akka/actor/Routing.scala new file mode 100644 index 0000000000..57e6dcf41e --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Routing.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor + +import akka.AkkaException + +class RoutingException(message: String) extends AkkaException(message) + +sealed trait RouterType + +/** + * @author Jonas Bonér + */ +object RouterType { + object Direct extends RouterType + object Random extends RouterType + object RoundRobin extends RouterType +} + +// FIXME move all routing in cluster here when we can \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 3fb44ec1aa..f7b4598dce 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -8,6 +8,7 @@ import akka.actor._ import akka.config.Config._ import akka.config.ConfigurationException import akka.util.{ListenerManagement, ReflectiveAccess} +import akka.serialization._ import akka.AkkaException /** @@ -89,6 +90,8 @@ object EventHandler extends ListenerManagement { lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("event:handler").build + implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener] + val level: Int = config.getString("akka.event-handler-level", "INFO") match { case "ERROR" => ErrorLevel case "WARNING" => WarningLevel @@ -107,8 +110,7 @@ object EventHandler extends ListenerManagement { defaultListeners foreach { listenerName => try { ReflectiveAccess.getClassFor[Actor](listenerName) map { clazz => - val listener = Actor.actorOf(clazz, listenerName).start() - addListener(listener) + addListener(Actor.actorOf(clazz, listenerName).start()) } } catch { case e: akka.actor.DeploymentAlreadyBoundException => // do nothing diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 20104fdcdb..a013207a8f 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -8,6 +8,7 @@ import akka.japi.Creator import akka.actor._ import akka.util._ import akka.dispatch.CompletableFuture +import akka.serialization._ import akka.AkkaException import scala.reflect.BeanProperty @@ -137,7 +138,9 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { val eventHandler: ActorRef = { - val handler = Actor.actorOf[RemoteEventHandler](classOf[RemoteEventHandler].getName).start() + implicit object format extends StatelessActorFormat[RemoteEventHandler] + val clazz = classOf[RemoteEventHandler] + val handler = Actor.actorOf(clazz, clazz.getName).start() // add the remote client and server listener that pipes the events to the event handler system addListener(handler) handler diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 5030b33030..5c16a3f8c5 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -12,6 +12,7 @@ import akka.remoteinterface.RemoteSupport import akka.actor._ import akka.event.EventHandler import akka.actor.DeploymentConfig.Deploy +import akka.serialization.Format /** * Helper class for reflective access to different modules in order to allow optional loading of modules. @@ -64,11 +65,19 @@ object ReflectiveAccess { } type ClusterNode = { - def nrOfActors: Int - def store[T <: Actor](address: String, actorRef: ActorRef) -// (implicit format: Format[T]) + def store[T <: Actor] + (address: String, actorClass: Class[T], replicas: Int, serializeMailbox: Boolean) + (implicit format: Format[T]) + + def store[T <: Actor] + (address: String, actorRef: ActorRef, replicas: Int, serializeMailbox: Boolean) + (implicit format: Format[T]) + def remove(address: String) - def use(address: String): Option[ActorRef] + def use(address: String): Array[ActorRef] + def ref(address: String, router: RouterType): ActorRef + def isClustered(address: String): Boolean + def nrOfActors: Int } type ClusterDeployer = { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 5af3e65e1a..73fc43f379 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -33,7 +33,7 @@ import akka.event.EventHandler import akka.dispatch.{Dispatchers, Future} import akka.remoteinterface._ import akka.config.Config._ -import akka.serialization.{Format, Serializer} +import akka.serialization.{Format, Serializers} import akka.serialization.Compression.LZF import akka.AkkaException @@ -413,7 +413,7 @@ class ClusterNode private[akka] ( val ACTOR_REGISTRY_NODE = CLUSTER_NODE + "/actor-registry" val ACTOR_LOCATIONS_NODE = CLUSTER_NODE + "/actor-locations" val ACTOR_ADDRESS_TO_UUIDS_NODE = CLUSTER_NODE + "/actor-address-to-uuids" - val ACTORS_AT_NODE_NODE = CLUSTER_NODE + "/actors-at-address" + val ACTORS_AT_NODE_NODE = CLUSTER_NODE + "/actors-at-address" val baseNodes = List( CLUSTER_NODE, MEMBERSHIP_NODE, @@ -852,7 +852,7 @@ class ClusterNode private[akka] ( /** * Creates an ActorRef with a Router to a set of clustered actors. */ - def ref(actorAddress: String, router: Router.RouterType): ActorRef = if (isConnected.isOn) { + def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) { val addresses = addressesForActor(actorAddress) val actorType = ActorType.ScalaActor // FIXME later we also want to suppot TypedActor, then 'actorType' needs to be configurable @@ -1031,7 +1031,7 @@ class ClusterNode private[akka] ( def send(f: Function0[Unit], replicationFactor: Int): Unit = { val message = RemoteDaemonMessageProtocol.newBuilder .setMessageType(FUNCTION_FUN0_UNIT) - .setPayload(ByteString.copyFrom(Serializer.Java.toBinary(f))) + .setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f))) .build replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) } @@ -1043,7 +1043,7 @@ class ClusterNode private[akka] ( def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]] = { val message = RemoteDaemonMessageProtocol.newBuilder .setMessageType(FUNCTION_FUN0_ANY) - .setPayload(ByteString.copyFrom(Serializer.Java.toBinary(f))) + .setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f))) .build val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ !!! message) results.toList.asInstanceOf[List[Future[Any]]] @@ -1056,7 +1056,7 @@ class ClusterNode private[akka] ( def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int): Unit = { val message = RemoteDaemonMessageProtocol.newBuilder .setMessageType(FUNCTION_FUN1_ARG_UNIT) - .setPayload(ByteString.copyFrom(Serializer.Java.toBinary((f, arg)))) + .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg)))) .build replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) } @@ -1069,7 +1069,7 @@ class ClusterNode private[akka] ( def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]] = { val message = RemoteDaemonMessageProtocol.newBuilder .setMessageType(FUNCTION_FUN1_ARG_ANY) - .setPayload(ByteString.copyFrom(Serializer.Java.toBinary((f, arg)))) + .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg)))) .build val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ !!! message) results.toList.asInstanceOf[List[Future[Any]]] @@ -1290,7 +1290,7 @@ class ClusterNode private[akka] ( val to = remoteServerAddress val command = RemoteDaemonMessageProtocol.newBuilder .setMessageType(FAIL_OVER_CONNECTIONS) - .setPayload(ByteString.copyFrom(Serializer.Java.toBinary((from, to)))) + .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((from, to)))) .build membershipNodes foreach { node => replicaConnections.get(node) foreach { case (_, connection) => @@ -1587,6 +1587,6 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { } private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { - Serializer.Java.fromBinary(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T] + Serializers.Java.fromBinary(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T] } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala index 6eb438d536..ebad829293 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala @@ -69,8 +69,9 @@ object ClusterDeployer { private val systemDeployments = List( Deploy( - RemoteClusterDaemon.ADDRESS, Direct, - Clustered(Deployer.defaultAddress, NoReplicas, Stateless)) + address = RemoteClusterDaemon.ADDRESS, + routing = Direct, + scope = Clustered(Deployer.defaultAddress, NoReplicas, Stateless)) ) private[akka] def init(deployments: List[Deploy]) { diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala index d7777cf5f8..c7f6760166 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala @@ -7,6 +7,7 @@ import Cluster._ import akka.actor._ import akka.actor.Actor._ +import akka.actor.RouterType._ import akka.dispatch.Future import akka.AkkaException @@ -20,30 +21,25 @@ class RoutingException(message: String) extends AkkaException(message) * @author Jonas Bonér */ object Router { - sealed trait RouterType - object Direct extends RouterType - object Random extends RouterType - object RoundRobin extends RouterType - def newRouter( routerType: RouterType, addresses: Array[Tuple2[UUID, InetSocketAddress]], - address: String, + serviceId: String, timeout: Long, actorType: ActorType, replicationStrategy: ReplicationStrategy = ReplicationStrategy.WriteThrough): ClusterActorRef = { routerType match { case Direct => new ClusterActorRef( - addresses, address, timeout, + addresses, serviceId, timeout, actorType, replicationStrategy) with Direct case Random => new ClusterActorRef( - addresses, address, timeout, + addresses, serviceId, timeout, actorType, replicationStrategy) with Random case RoundRobin => new ClusterActorRef( - addresses, address, timeout, + addresses, serviceId, timeout, actorType, replicationStrategy) with RoundRobin } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 8df23e9434..c9e1a15b37 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -8,7 +8,7 @@ import org.I0Itec.zkclient._ import akka.actor._ import akka.actor.Actor._ -import akka.serialization.{Serializer, SerializerBasedActorFormat} +import akka.serialization.{Serializers, SerializerBasedActorFormat} import akka.util.Helpers._ import akka.actor.DeploymentConfig._ @@ -30,7 +30,7 @@ class MyJavaSerializableActor extends Actor with Serializable { object BinaryFormatMyJavaSerializableActor { implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] with Serializable { - val serializer = Serializer.Java + val serializer = Serializers.Java } } @@ -787,7 +787,7 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with node2.stop node3.stop } -*/ + "be able to create a reference to a replicated actor by address using Router.RoundRobin routing" in { // create actor val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start @@ -830,6 +830,8 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with node2.stop node3.stop } + +*/ } override def beforeAll() = { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusteredPingPongSample.scala b/akka-cluster/src/test/scala/akka/cluster/ClusteredPingPongSample.scala index a4b90e457f..8c932ec79a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusteredPingPongSample.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusteredPingPongSample.scala @@ -8,7 +8,7 @@ import akka.cluster._ import akka.actor._ import akka.actor.Actor._ -import akka.serialization.{Serializer, SerializerBasedActorFormat} +import akka.serialization.{Serializers, SerializerBasedActorFormat} import java.util.concurrent.CountDownLatch @@ -67,15 +67,16 @@ object PingPong { object BinaryFormats { implicit object PingActorFormat extends SerializerBasedActorFormat[PingActor] with Serializable { - val serializer = Serializer.Java + val serializer = Serializers.Java } implicit object PongActorFormat extends SerializerBasedActorFormat[PongActor] with Serializable { - val serializer = Serializer.Java + val serializer = Serializers.Java } } } +/* object ClusteredPingPongSample { import PingPong._ import BinaryFormats._ @@ -145,3 +146,4 @@ object ClusteredPingPongSample { Cluster.shutdownLocalCluster() } } +*/ \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/PingPongMultiJvmExample.scala b/akka-cluster/src/test/scala/akka/cluster/PingPongMultiJvmExample.scala index b93edb7bdf..aef51de905 100644 --- a/akka-cluster/src/test/scala/akka/cluster/PingPongMultiJvmExample.scala +++ b/akka-cluster/src/test/scala/akka/cluster/PingPongMultiJvmExample.scala @@ -7,7 +7,7 @@ package example.cluster import akka.cluster._ import akka.actor._ -import akka.serialization.{Serializer, SerializerBasedActorFormat} +import akka.serialization.{Serializers, SerializerBasedActorFormat} import akka.util.duration._ object PingPong { @@ -69,15 +69,16 @@ object PingPong { object BinaryFormats { implicit object PingActorFormat extends SerializerBasedActorFormat[PingActor] with Serializable { - val serializer = Serializer.Java + val serializer = Serializers.Java } implicit object PongActorFormat extends SerializerBasedActorFormat[PongActor] with Serializable { - val serializer = Serializer.Java + val serializer = Serializers.Java } } } +/* object PingPongMultiJvmNode1 { import PingPong._ import BinaryFormats._ @@ -238,4 +239,4 @@ class PongNode(number: Int) { node.stop } } - +*/ \ No newline at end of file diff --git a/akka-docs/pending/serialization-scala.rst b/akka-docs/pending/serialization-scala.rst index a0b0e312e6..166a3291fe 100644 --- a/akka-docs/pending/serialization-scala.rst +++ b/akka-docs/pending/serialization-scala.rst @@ -61,7 +61,7 @@ Step 2: Implement the type class for the actor object BinaryFormatMyActor { implicit object MyActorFormat extends Format[MyActor] { def fromBinary(bytes: Array[Byte], act: MyActor) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] act.count = p.getCount act } @@ -170,7 +170,7 @@ Create a module for the type class .. object BinaryFormatMyJavaSerializableActor { implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] { - val serializer = Serializer.Java + val serializer = Serializers.Java } } @@ -258,7 +258,7 @@ Step 2: Implement the type class for the actor class MyTypedActorFormat extends Format[MyTypedActorImpl] { def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] act.count = p.getCount act } @@ -316,7 +316,7 @@ If you are sending messages to a remote Actor and these messages implement one o Each serialization interface/trait in * akka.serialization.Serializable.* > has a matching serializer in -* akka.serialization.Serializer.* +* akka.serialization.Serializers.* Note however that if you are using one of the Serializable interfaces then you don’t have to do anything else in regard to sending remote messages. @@ -326,7 +326,7 @@ The ones currently supported are (besides the default which is regular Java seri * SBinary (Scala only) * Protobuf (Scala and Java) -Apart from the above, Akka also supports Scala object serialization through `SJSON `_ that implements APIs similar to 'akka.serialization.Serializer.*'. See the section on SJSON below for details. +Apart from the above, Akka also supports Scala object serialization through `SJSON `_ that implements APIs similar to 'akka.serialization.Serializers.*'. See the section on SJSON below for details. Protobuf -------- @@ -400,7 +400,7 @@ For your POJOs to be able to serialize themselves you have to extend the ScalaJS s.fromJSON(s.toJSON) should equal(s) } -Use akka.serialization.Serializer.ScalaJSON to do generic JSON serialization, e.g. serialize object that does not extend ScalaJSON using the JSON serializer. Serialization using Serializer can be done in two ways :- +Use akka.serialization.Serializers.ScalaJSON to do generic JSON serialization, e.g. serialize object that does not extend ScalaJSON using the JSON serializer. Serialization using Serializer can be done in two ways :- 1. Type class based serialization (recommended) 2. Reflection based serialization @@ -430,7 +430,7 @@ Here are the steps that you need to follow: .. code-block:: scala - import akka.serialization.Serializer.ScalaJSON + import akka.serialization.Serializers.ScalaJSON val o = MyMessage("dg", ("akka", 100)) fromjson[MyMessage](tojson(o)) should equal(o) @@ -451,26 +451,26 @@ You can also use the Serializer abstraction to serialize using the reflection ba } val foo = new Foo("bar") - val json = Serializer.ScalaJSON.out(foo) + val json = Serializers.ScalaJSON.out(foo) - val fooCopy = Serializer.ScalaJSON.in(json) // returns a JsObject as an AnyRef + val fooCopy = Serializers.ScalaJSON.in(json) // returns a JsObject as an AnyRef - val fooCopy2 = Serializer.ScalaJSON.in(new String(json)) // can also take a string as input + val fooCopy2 = Serializers.ScalaJSON.in(new String(json)) // can also take a string as input - val fooCopy3 = Serializer.ScalaJSON.in[Foo](json).asInstanceOf[Foo] + val fooCopy3 = Serializers.ScalaJSON.in[Foo](json).asInstanceOf[Foo] Classes without a @BeanInfo annotation cannot be serialized as JSON. So if you see something like that: .. code-block:: scala - scala> Serializer.ScalaJSON.out(bar) - Serializer.ScalaJSON.out(bar) + scala> Serializers.ScalaJSON.out(bar) + Serializers.ScalaJSON.out(bar) java.lang.UnsupportedOperationException: Class class Bar not supported for conversion at sjson.json.JsBean$class.toJSON(JsBean.scala:210) - at sjson.json.Serializer$SJSON$.toJSON(Serializer.scala:107) - at sjson.json.Serializer$SJSON$class.out(Serializer.scala:37) - at sjson.json.Serializer$SJSON$.out(Serializer.scala:107) + at sjson.json.Serializer$SJSON$.toJSON(Serializers.scala:107) + at sjson.json.Serializer$SJSON$class.out(Serializers.scala:37) + at sjson.json.Serializer$SJSON$.out(Serializers.scala:107) at akka.serialization.Serializer$ScalaJSON... it means, that you haven't got a @BeanInfo annotation on your class. @@ -747,8 +747,8 @@ and the serialization code like the following: object TestSerialize{ def main(args: Array[String]) { val test1 = new D(List(B("hello1"))) - val json = sjson.json.Serializer.SJSON.out(test1) - val res = sjson.json.Serializer.SJSON.in[D](json) + val json = sjson.json.Serializers.SJSON.out(test1) + val res = sjson.json.Serializers.SJSON.in[D](json) val res1: D = res.asInstanceOf[D] println(res1) } @@ -931,15 +931,15 @@ SBinary: Scala To serialize Scala structures you can use SBinary serializer. SBinary can serialize all primitives and most default Scala datastructures; such as List, Tuple, Map, Set, BigInt etc. -Here is an example of using the akka.serialization.Serializer.SBinary serializer to serialize standard Scala library objects. +Here is an example of using the akka.serialization.Serializers.SBinary serializer to serialize standard Scala library objects. .. code-block:: scala import akka.serialization.Serializer import sbinary.DefaultProtocol._ // you always need to import these implicits val users = List(("user1", "passwd1"), ("user2", "passwd2"), ("user3", "passwd3")) - val bytes = Serializer.SBinary.out(users) - val usersCopy = Serializer.SBinary.in(bytes, Some(classOf[List[Tuple2[String,String]]])) + val bytes = Serializers.SBinary.out(users) + val usersCopy = Serializers.SBinary.in(bytes, Some(classOf[List[Tuple2[String,String]]])) If you need to serialize your own user-defined objects then you have to do three things: # Define an empty constructor diff --git a/akka-http/src/main/resources/META-INF/services/javax.ws.rs.ext.MessageBodyWriter_FIXME b/akka-http/src/main/resources/META-INF/services/javax.ws.rs.ext.MessageBodyWriter_FIXME deleted file mode 100644 index 51bc8cccd2..0000000000 --- a/akka-http/src/main/resources/META-INF/services/javax.ws.rs.ext.MessageBodyWriter_FIXME +++ /dev/null @@ -1 +0,0 @@ -se.scalablesolutions.akka.rest.ListWriter diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 3258fe3461..6da0004bab 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -4,17 +4,17 @@ package akka.remote -import akka.serialization.{Serializer, Serializable} +import akka.serialization.{Serializers, Serializable} import akka.remote.protocol.RemoteProtocol._ import akka.util._ import com.google.protobuf.{Message, ByteString} object MessageSerializer { - private def SERIALIZER_JAVA: Serializer.Java = Serializer.Java - private def SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON - private def SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON - private def SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf + private def SERIALIZER_JAVA: Serializers.Java = Serializers.Java + private def SERIALIZER_JAVA_JSON: Serializers.JavaJSON = Serializers.JavaJSON + private def SERIALIZER_SCALA_JSON: Serializers.ScalaJSON = Serializers.ScalaJSON + private def SERIALIZER_PROTOBUF: Serializers.Protobuf = Serializers.Protobuf def setClassLoader(cl: ClassLoader) = { val someCl = Some(cl) diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index adfd2f3bfd..5474ce6cf7 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -19,58 +19,6 @@ import akka.util.ReflectiveAccess import java.net.InetSocketAddress import akka.remote. {RemoteClientSettings, MessageSerializer} -/** - * Type class definition for Actor Serialization - */ -trait FromBinary[T <: Actor] { - def fromBinary(bytes: Array[Byte], act: T): T -} - -trait ToBinary[T <: Actor] { - def toBinary(t: T): Array[Byte] -} - -// client needs to implement Format[] for the respective actor -trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] - -/** - * A default implementation for a stateless actor - * - * Create a Format object with the client actor as the implementation of the type class - * - *
- * object BinaryFormatMyStatelessActor  {
- *   implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
- * }
- * 
- */ -trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable { - def fromBinary(bytes: Array[Byte], act: T) = act - - def toBinary(ac: T) = Array.empty[Byte] -} - -/** - * A default implementation of the type class for a Format that specifies a serializer - * - * Create a Format object with the client actor as the implementation of the type class and - * a serializer object - * - *
- * object BinaryFormatMyJavaSerializableActor  {
- *   implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor]  {
- *     val serializer = Serializer.Java
- * }
- * }
- * 
- */ -trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable { - val serializer: Serializer - - def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T] - - def toBinary(ac: T) = serializer.toBinary(ac) -} /** * Module for local actor serialization. @@ -140,7 +88,7 @@ object ActorSerialization { builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) lifeCycleProtocol.foreach(builder.setLifeCycle(_)) actorRef.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s))) - if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(Serializer.Java.toBinary(actorRef.hotswap))) + if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(Serializers.Java.toBinary(actorRef.hotswap))) builder.build } diff --git a/akka-remote/src/main/scala/akka/serialization/Serializer.scala b/akka-remote/src/main/scala/akka/serialization/Serializer.scala index 3fc661afce..2b58d05f87 100644 --- a/akka-remote/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/Serializer.scala @@ -14,20 +14,9 @@ import org.codehaus.jackson.map.ObjectMapper import sjson.json.{Serializer => SJSONSerializer} -/** - * @author Jonas Bonér - */ -trait Serializer extends scala.Serializable { - @volatile var classLoader: Option[ClassLoader] = None - def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) - - def toBinary(obj: AnyRef): Array[Byte] - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef -} - // For Java API class SerializerFactory { - import Serializer._ + import Serializers._ def getJava: Java.type = Java def getJavaJSON: JavaJSON.type = JavaJSON def getScalaJSON: ScalaJSON.type = ScalaJSON @@ -37,7 +26,7 @@ class SerializerFactory { /** * @author Jonas Bonér */ -object Serializer { +object Serializers { val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) object NOOP extends NOOP diff --git a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala index cd8f71058e..a17db38342 100644 --- a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala +++ b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala @@ -6,7 +6,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import akka.serialization.Serializer.ScalaJSON +import akka.serialization.Serializers.ScalaJSON //TODO: FIXME WHY IS THIS COMMENTED OUT? object Protocols { diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 39584726f9..682d647751 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -22,7 +22,7 @@ class SerializableTypeClassActorSpec extends object BinaryFormatMyActor { implicit object MyActorFormat extends Format[MyActor] { def fromBinary(bytes: Array[Byte], act: MyActor) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] act.count = p.getCount act } @@ -34,7 +34,7 @@ class SerializableTypeClassActorSpec extends object BinaryFormatMyActorWithDualCounter { implicit object MyActorWithDualCounterFormat extends Format[MyActorWithDualCounter] { def fromBinary(bytes: Array[Byte], act: MyActorWithDualCounter) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] act.count1 = p.getCount1 act.count2 = p.getCount2 act @@ -58,7 +58,7 @@ class SerializableTypeClassActorSpec extends object BinaryFormatMyJavaSerializableActor { implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] { - val serializer = Serializer.Java + val serializer = Serializers.Java } } diff --git a/akka-remote/src/test/scala/serialization/SerializerSpec.scala b/akka-remote/src/test/scala/serialization/SerializerSpec.scala index 9c503b3f97..349c772f59 100644 --- a/akka-remote/src/test/scala/serialization/SerializerSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializerSpec.scala @@ -20,20 +20,20 @@ class SerializerSpec extends JUnitSuite { @Test def shouldSerializeString = { val f = Foo("debasish") - val json = Serializer.ScalaJSON.toBinary(f) + val json = Serializers.ScalaJSON.toBinary(f) assert(new String(json) == """{"foo":"debasish"}""") - val fo = Serializer.ScalaJSON.fromJSON[Foo](new String(json)).asInstanceOf[Foo] + val fo = Serializers.ScalaJSON.fromJSON[Foo](new String(json)).asInstanceOf[Foo] assert(fo == f) } @Test def shouldSerializeTuple2 = { val message = MyMessage("id", ("hello", 34)) - val json = Serializer.ScalaJSON.toBinary(message) + val json = Serializers.ScalaJSON.toBinary(message) assert(new String(json) == """{"id":"id","value":{"hello":34}}""") - val f = Serializer.ScalaJSON.fromJSON[MyMessage](new String(json)).asInstanceOf[MyMessage] + val f = Serializers.ScalaJSON.fromJSON[MyMessage](new String(json)).asInstanceOf[MyMessage] assert(f == message) - val g = Serializer.ScalaJSON.fromBinary[MyMessage](json).asInstanceOf[MyMessage] + val g = Serializers.ScalaJSON.fromBinary[MyMessage](json).asInstanceOf[MyMessage] assert(f == message) } } diff --git a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala index 1697367b33..f1308e3f0e 100644 --- a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala +++ b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala @@ -21,7 +21,7 @@ class Ticket435Spec extends object BinaryFormatMyStatefulActor { implicit object MyStatefulActorFormat extends Format[MyStatefulActor] { def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] act.count = p.getCount act } diff --git a/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala index d3b8e1f991..4e9e81bbaa 100644 --- a/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala +++ b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala @@ -18,7 +18,7 @@ class TypedActorSerializationSpec extends AkkaRemoteTest { class MyTypedActorFormat extends Format[MyTypedActorImpl] { def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] act.count = p.getCount act } @@ -28,7 +28,7 @@ class TypedActorSerializationSpec extends AkkaRemoteTest { class MyTypedActorWithDualCounterFormat extends Format[MyTypedActorWithDualCounter] { def fromBinary(bytes: Array[Byte], act: MyTypedActorWithDualCounter) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] act.count1 = p.getCount1 act.count2 = p.getCount2 act diff --git a/akka-remote/src/test/scala/serialization/UntypedActorSerializationSpec.scala b/akka-remote/src/test/scala/serialization/UntypedActorSerializationSpec.scala index e33de43571..b62ebc1863 100644 --- a/akka-remote/src/test/scala/serialization/UntypedActorSerializationSpec.scala +++ b/akka-remote/src/test/scala/serialization/UntypedActorSerializationSpec.scala @@ -20,7 +20,7 @@ class UntypedActorSerializationSpec extends class MyUntypedActorFormat extends Format[MyUntypedActor] { def fromBinary(bytes: Array[Byte], act: MyUntypedActor) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] act.count = p.getCount act } @@ -30,7 +30,7 @@ class UntypedActorSerializationSpec extends class MyUntypedActorWithDualCounterFormat extends Format[MyUntypedActorWithDualCounter] { def fromBinary(bytes: Array[Byte], act: MyUntypedActorWithDualCounter) = { - val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] + val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] act.count1 = p.getCount1 act.count2 = p.getCount2 act diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index 034dec5178..9576338fdd 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -141,7 +141,7 @@ public class Pi { // create the workers final ActorRef[] workers = new ActorRef[nrOfWorkers]; for (int i = 0; i < nrOfWorkers; i++) { - workers[i] = actorOf(Worker.class).start(); + workers[i] = actorOf(Worker.class, "worker").start(); } // wrap them with a load-balancing router @@ -149,7 +149,7 @@ public class Pi { public UntypedActor create() { return new PiRouter(workers); } - }).start(); + }, "router").start(); } // message handler @@ -207,7 +207,7 @@ public class Pi { public UntypedActor create() { return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); } - }).start(); + }, "master").start(); // start the calculation master.sendOneWay(new Calculate()); diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java index 6397c09148..6d01b752b1 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java @@ -134,7 +134,7 @@ public class Pi { // create the workers final ActorRef[] workers = new ActorRef[nrOfWorkers]; for (int i = 0; i < nrOfWorkers; i++) { - workers[i] = actorOf(Worker.class).start(); + workers[i] = actorOf(Worker.class, "worker").start(); } // wrap them with a load-balancing router @@ -142,7 +142,7 @@ public class Pi { public UntypedActor create() { return new PiRouter(workers); } - }).start(); + }, "router").start(); } @Override @@ -202,7 +202,7 @@ public class Pi { public UntypedActor create() { return new Master(nrOfWorkers, nrOfMessages, nrOfElements); } - }).start(); + }, "worker").start(); // start the calculation long start = currentTimeMillis(); diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index cb9037476f..6972307919 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -272,7 +272,7 @@ abstract class TypedActor extends Actor with Proxyable { if (!unserializable && hasMutableArgument) { //FIXME serializeArguments - // val copyOfArgs = Serializer.Java.deepClone(args) + // val copyOfArgs = Serializers.Java.deepClone(args) // joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]]) joinPoint } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 25dda831ef..6d0338a8f9 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -45,6 +45,7 @@ akka { # available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages" # or: fully qualified class name of the router class # default is "direct"; + format = "akka.serializer.Format$Default$" clustered { # makes the actor available in the cluster registry # default (if omitted) is local non-clustered actor home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor