Misc fixes everywhere; deployment, serialization etc.

This commit is contained in:
Jonas Bonér 2011-05-16 09:47:23 +02:00
parent d50ab24b11
commit 6c6089e081
33 changed files with 329 additions and 221 deletions

4
.gitignore vendored
View file

@ -50,4 +50,6 @@ akka-docs/_build/
akka-tutorials/akka-tutorial-first/project/boot/ akka-tutorials/akka-tutorial-first/project/boot/
akka-tutorials/akka-tutorial-first/project/plugins/project/ akka-tutorials/akka-tutorial-first/project/plugins/project/
akka-docs/exts/ akka-docs/exts/
_akka_cluster/ _akka_cluster/
Makefile
akka.sublime-project

View file

@ -14,7 +14,14 @@ class DeployerSpec extends WordSpec with MustMatchers {
"be able to parse 'akka.actor.deployment._' config elements" in { "be able to parse 'akka.actor.deployment._' config elements" in {
val deployment = Deployer.lookupInConfig("service-pi") val deployment = Deployer.lookupInConfig("service-pi")
deployment must be ('defined) deployment must be ('defined)
deployment must equal (Some(Deploy("service-pi", RoundRobin, Clustered(Node("test-1"), Replicate(3), Stateless)))) deployment must equal (Some(Deploy(
"service-pi",
RoundRobin,
"akka.serializer.Format$Default$",
Clustered(
Node("test-1"),
Replicate(3),
Stateless))))
} }
} }
} }

View file

@ -3,6 +3,8 @@ package akka.actor;
import akka.japi.Creator; import akka.japi.Creator;
import akka.remoteinterface.RemoteSupport; import akka.remoteinterface.RemoteSupport;
import com.eaio.uuid.UUID;
/** /**
* JAVA API for * JAVA API for
* - creating actors, * - creating actors,
@ -28,6 +30,30 @@ public class Actors {
return Actor$.MODULE$.remote(); return Actor$.MODULE$.remote();
} }
/**
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
* UntypedActor instance directly, but only through its 'ActorRef' wrapper reference.
* <p/>
* Creates an ActorRef out of the Actor. Allows you to pass in the instance for the UntypedActor.
* Only use this method when you need to pass in constructor arguments into the 'UntypedActor'.
* <p/>
* You use it by implementing the UntypedActorFactory interface.
* Example in Java:
* <pre>
* ActorRef actor = Actors.actorOf(new UntypedActorFactory() {
* public UntypedActor create() {
* return new MyUntypedActor("service:name", 5);
* }
* }, "my-actor-address");
* actor.start();
* actor.sendOneWay(message, context);
* actor.stop();
* </pre>
*/
public static ActorRef actorOf(final Creator<Actor> factory, final String address) {
return Actor$.MODULE$.actorOf(factory, address);
}
/** /**
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
* UntypedActor instance directly, but only through its 'ActorRef' wrapper reference. * UntypedActor instance directly, but only through its 'ActorRef' wrapper reference.
@ -49,14 +75,32 @@ public class Actors {
* </pre> * </pre>
*/ */
public static ActorRef actorOf(final Creator<Actor> factory) { public static ActorRef actorOf(final Creator<Actor> factory) {
return Actor$.MODULE$.actorOf(factory); return Actor$.MODULE$.actorOf(factory, new UUID().toString());
} }
/** /**
* Creates an ActorRef out of the Actor type represented by the class provided. * Creates an ActorRef out of the Actor type represented by the class provided.
* Example in Java: * Example in Java:
* <pre> * <pre>
* ActorRef actor = Actors.actorOf(MyUntypedActor.class); * ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
* actor.start();
* actor.sendOneWay(message, context);
* actor.stop();
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = Actors.actorOf(MyActor.class, "my-actor-address").start();
* </pre>
*/
public static ActorRef actorOf(final Class<? extends Actor> type, final String address) {
return Actor$.MODULE$.actorOf(type, address);
}
/**
* Creates an ActorRef out of the Actor type represented by the class provided.
* Example in Java:
* <pre>
* ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
* actor.start(); * actor.start();
* actor.sendOneWay(message, context); * actor.sendOneWay(message, context);
* actor.stop(); * actor.stop();
@ -67,7 +111,7 @@ public class Actors {
* </pre> * </pre>
*/ */
public static ActorRef actorOf(final Class<? extends Actor> type) { public static ActorRef actorOf(final Class<? extends Actor> type) {
return Actor$.MODULE$.actorOf(type); return Actor$.MODULE$.actorOf(type, new UUID().toString());
} }
/** /**

View file

@ -11,11 +11,12 @@ import Helpers.{narrow, narrowSilently}
import akka.remoteinterface.RemoteSupport import akka.remoteinterface.RemoteSupport
import akka.japi.{Creator, Procedure} import akka.japi.{Creator, Procedure}
import akka.AkkaException import akka.AkkaException
import akka.serialization._
import akka.event.EventHandler
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.event.EventHandler
/** /**
* Life-cycle messages for the Actors * Life-cycle messages for the Actors
@ -145,8 +146,8 @@ object Actor extends ListenerManagement {
* val actor = actorOf[MyActor].start() * val actor = actorOf[MyActor].start()
* </pre> * </pre>
*/ */
def actorOf[T <: Actor : Manifest](address: String): ActorRef = def actorOf[T <: Actor : Manifest](address: String): ActorRef =
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address) actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address)
/** /**
* Creates an ActorRef out of the Actor with type T. * Creates an ActorRef out of the Actor with type T.
@ -163,8 +164,8 @@ object Actor extends ListenerManagement {
* val actor = actorOf[MyActor].start * val actor = actorOf[MyActor].start
* </pre> * </pre>
*/ */
def actorOf[T <: Actor : Manifest]: ActorRef = def actorOf[T <: Actor : Manifest]: ActorRef =
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString) actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString)
/** /**
* Creates an ActorRef out of the Actor of the specified Class. * Creates an ActorRef out of the Actor of the specified Class.
@ -181,7 +182,8 @@ object Actor extends ListenerManagement {
* val actor = actorOf(classOf[MyActor]).start() * val actor = actorOf(classOf[MyActor]).start()
* </pre> * </pre>
*/ */
def actorOf(clazz: Class[_ <: Actor]): ActorRef = actorOf(clazz, new UUID().toString) def actorOf[T <: Actor](clazz: Class[T]): ActorRef =
actorOf(clazz, new UUID().toString)
/** /**
* Creates an ActorRef out of the Actor of the specified Class. * Creates an ActorRef out of the Actor of the specified Class.
@ -197,28 +199,63 @@ object Actor extends ListenerManagement {
* val actor = actorOf(classOf[MyActor]).start * val actor = actorOf(classOf[MyActor]).start
* </pre> * </pre>
*/ */
def actorOf(clazz: Class[_ <: Actor], address: String): ActorRef = { def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = {
import DeploymentConfig._ import DeploymentConfig._
import ReflectiveAccess._
Address.validate(address) Address.validate(address)
try { try {
Deployer.deploymentFor(address) match { Deployer.deploymentFor(address) match {
case Deploy(_, router, Local) => case Deploy(_, router, _, Local) =>
// FIXME handle 'router' in 'Local' actors // FIXME handle 'router' in 'Local' actors
newLocalActorRef(clazz, address) newLocalActorRef(clazz, address)
case Deploy(_, router, Clustered(home, replication , state)) => case Deploy(_, router, formatClassName, Clustered(home, replication, state)) =>
sys.error("Clustered deployment not yet supported") ClusterModule.ensureEnabled()
/*
if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
val hostname = home match {
case Host(hostname) => hostname
case IP(address) => address
case Node(nodeName) => "localhost" // FIXME lookup hostname for node name
}
val replicas = replication match {
case Replicate(replicas) => replicas
case AutoReplicate => -1
case NoReplicas => 0
}
import ClusterModule.node
if (hostname == RemoteModule.remoteServerHostname) { // home node for clustered actor
def formatErrorDueTo(reason: String) = {
throw new akka.config.ConfigurationException(
"Could not create Format[T] object [" + formatClassName +
"] for serialization of actor [" + address +
"] since " + reason)
}
implicit val format: Format[T] = {
if (formatClassName == "N/A") formatErrorDueTo("no class name defined in configuration")
val f = ReflectiveAccess.getObjectFor(formatClassName).getOrElse(formatErrorDueTo("it could not be loaded"))
if (f.isInstanceOf[Format[T]]) f.asInstanceOf[Format[T]]
else formatErrorDueTo("class must be of type [akka.serialization.Format[T]]")
}
if (!node.isClustered(address)) node.store(address, clazz, replicas, false)
node.use(address)
} else {
//val router =
node.ref(address, null)
}
sys.error("Clustered deployment not yet supported")
/*
val remoteAddress = Actor.remote.address val remoteAddress = Actor.remote.address
if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) { if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) {
// home node for actor // home node for actor
if (!node.isClustered(address)) node.store(clazz, address)
node.use(address).head
} else { } else {
val router =
node.ref(address, router)
} }
*/ */
/* /*
@ -231,9 +268,6 @@ object Actor extends ListenerManagement {
Misc stuff: Misc stuff:
- How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured?
- Deployer should:
1. Check if deployment exists in ZK
2. If not, upload it
- ClusterNode API and Actor.remote API should be made private[akka] - ClusterNode API and Actor.remote API should be made private[akka]
- Rewrite ClusterSpec or remove it - Rewrite ClusterSpec or remove it
- Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor - Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor
@ -275,7 +309,7 @@ object Actor extends ListenerManagement {
* val actor = actorOf(new MyActor).start() * val actor = actorOf(new MyActor).start()
* </pre> * </pre>
*/ */
def actorOf(factory: => Actor): ActorRef = actorOf(factory, new UUID().toString) def actorOf[T <: Actor](factory: => T): ActorRef = actorOf(factory, new UUID().toString)
/** /**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function * Creates an ActorRef out of the Actor. Allows you to pass in a factory function
@ -295,7 +329,7 @@ object Actor extends ListenerManagement {
* val actor = actorOf(new MyActor).start * val actor = actorOf(new MyActor).start
* </pre> * </pre>
*/ */
def actorOf(factory: => Actor, address: String): ActorRef = { def actorOf[T <: Actor](factory: => T, address: String): ActorRef = {
Address.validate(address) Address.validate(address)
new LocalActorRef(() => factory, address) new LocalActorRef(() => factory, address)
} }
@ -309,7 +343,8 @@ object Actor extends ListenerManagement {
* This function should <b>NOT</b> be used for remote actors. * This function should <b>NOT</b> be used for remote actors.
* JAVA API * JAVA API
*/ */
def actorOf(creator: Creator[Actor]): ActorRef = actorOf(creator, new UUID().toString) def actorOf[T <: Actor](creator: Creator[T]): ActorRef =
actorOf(creator, new UUID().toString)
/** /**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>) * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
@ -319,7 +354,7 @@ object Actor extends ListenerManagement {
* This function should <b>NOT</b> be used for remote actors. * This function should <b>NOT</b> be used for remote actors.
* JAVA API * JAVA API
*/ */
def actorOf(creator: Creator[Actor], address: String): ActorRef = { def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = {
Address.validate(address) Address.validate(address)
new LocalActorRef(() => creator.create, address) new LocalActorRef(() => creator.create, address)
} }
@ -332,7 +367,7 @@ object Actor extends ListenerManagement {
* there is a method 'spawn[ActorType]' in the Actor trait already. * there is a method 'spawn[ActorType]' in the Actor trait already.
* Example: * Example:
* <pre> * <pre>
* import Actor.{spawn} * import Actor.spawn
* *
* spawn { * spawn {
* ... // do stuff * ... // do stuff

View file

@ -967,7 +967,7 @@ private[akka] case class RemoteActorRef private[akka] (
import DeploymentConfig._ import DeploymentConfig._
val remoteAddress = Deployer.deploymentFor(address) match { val remoteAddress = Deployer.deploymentFor(address) match {
case Deploy(_, _, Clustered(home, _, _)) => case Deploy(_, _, _, Clustered(home, _, _)) =>
val hostname = home match { val hostname = home match {
case Host(hostname) => hostname case Host(hostname) => hostname
case IP(address) => address case IP(address) => address

View file

@ -13,6 +13,7 @@ import java.util.{Set => JSet}
import akka.util.ReflectiveAccess._ import akka.util.ReflectiveAccess._
import akka.util.{ReflectiveAccess, ReadWriteGuard, ListenerManagement} import akka.util.{ReflectiveAccess, ReadWriteGuard, ListenerManagement}
import akka.serialization._
/** /**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@ -101,8 +102,9 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
/** /**
* Registers an actor in the Cluster ActorRegistry. * Registers an actor in the Cluster ActorRegistry.
*/ */
private[akka] def registerInCluster(address: String, actor: ActorRef) { private[akka] def registerInCluster[T <: Actor](
ClusterModule.node.store(address, actor) address: String, actor: ActorRef, replicas: Int, serializeMailbox: Boolean = false)(implicit format: Format[T]) {
ClusterModule.node.store(address, actor, replicas, serializeMailbox)
} }
/** /**

View file

@ -16,40 +16,6 @@ import akka.AkkaException
/** /**
* Programatic deployment configuration classes. Most values have defaults and can be left out. * Programatic deployment configuration classes. Most values have defaults and can be left out.
* <p/>
* Example Scala API:
* <pre>
* import akka.actor.DeploymentConfig._
*
* val deploymentHello = Deploy("service:hello", Local)
*
* val deploymentE = Deploy("service:e", AutoReplicate, Clustered(Host("darkstar.lan"), Stateful))
*
* val deploymentPi1 = Deploy("service:pi", Replicate(3), Clustered(Host("darkstar.lan"), Stateless(RoundRobin)))
*
* // same thing as 'deploymentPi1' but more explicit
* val deploymentPi2 =
* Deploy(
* address = "service:pi",
* replicas = 3,
* scope = Clustered(
* home = Host("darkstar.lan")
* state = Stateless(
* routing = RoundRobin
* )
* )
* )
* </pre>
* Example Java API:
* <pre>
* import static akka.actor.*;
*
* val deploymentHello = new Deploy("service:hello", new Local());
*
* val deploymentE = new Deploy("service:e", new AutoReplicate(), new Clustered(new Host("darkstar.lan"), new Stateful()));
*
* val deploymentPi1 = new Deploy("service:pi", new Replicate(3), new Clustered(new Host("darkstar.lan"), new Stateless(new RoundRobin())))
* </pre>
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@ -58,7 +24,7 @@ object DeploymentConfig {
// -------------------------------- // --------------------------------
// --- Deploy // --- Deploy
// -------------------------------- // --------------------------------
case class Deploy(address: String, routing: Routing = Direct, scope: Scope = Local) case class Deploy(address: String, routing: Routing = Direct, format: String = "N/A", scope: Scope = Local)
// -------------------------------- // --------------------------------
// --- Routing // --- Routing
@ -179,7 +145,7 @@ object Deployer {
} }
def isLocal(deployment: Deploy): Boolean = deployment match { def isLocal(deployment: Deploy): Boolean = deployment match {
case Deploy(_, _, Local) => true case Deploy(_, _, _, Local) => true
case _ => false case _ => false
} }
@ -251,7 +217,7 @@ object Deployer {
// -------------------------------- // --------------------------------
val addressPath = "akka.actor.deployment." + address val addressPath = "akka.actor.deployment." + address
Config.config.getSection(addressPath) match { Config.config.getSection(addressPath) match {
case None => Some(Deploy(address, Direct, Local)) case None => Some(Deploy(address, Direct, "N/A", Local))
case Some(addressConfig) => case Some(addressConfig) =>
// -------------------------------- // --------------------------------
@ -275,12 +241,17 @@ object Deployer {
CustomRouter(customRouter) CustomRouter(customRouter)
} }
// --------------------------------
// akka.actor.deployment.<address>.format
// --------------------------------
val format = addressConfig.getString("format", "N/A")
// -------------------------------- // --------------------------------
// akka.actor.deployment.<address>.clustered // akka.actor.deployment.<address>.clustered
// -------------------------------- // --------------------------------
addressConfig.getSection("clustered") match { addressConfig.getSection("clustered") match {
case None => case None =>
Some(Deploy(address, router, Local)) // deploy locally Some(Deploy(address, router, "N/A", Local)) // deploy locally
case Some(clusteredConfig) => case Some(clusteredConfig) =>
@ -334,7 +305,7 @@ object Deployer {
if (clusteredConfig.getBool("stateless", false)) Stateless if (clusteredConfig.getBool("stateless", false)) Stateless
else Stateful else Stateful
Some(Deploy(address, router, Clustered(home, replicas, state))) Some(Deploy(address, router, format, Clustered(home, replicas, state)))
} }
} }
} }

View file

@ -0,0 +1,73 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.serialization
import akka.actor.Actor
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
/**
* Type class definition for Actor Serialization
*/
trait FromBinary[T <: Actor] {
def fromBinary(bytes: Array[Byte], act: T): T
}
trait ToBinary[T <: Actor] {
def toBinary(t: T): Array[Byte]
}
// client needs to implement Format[] for the respective actor
trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Serializer extends scala.Serializable {
@volatile var classLoader: Option[ClassLoader] = None
def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass))
def toBinary(obj: AnyRef): Array[Byte]
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
/**
* A default implementation for a stateless actor
*
* Create a Format object with the client actor as the implementation of the type class
*
* <pre>
* object BinaryFormatMyStatelessActor {
* implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
* }
* </pre>
*/
trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable {
def fromBinary(bytes: Array[Byte], act: T) = act
def toBinary(ac: T) = Array.empty[Byte]
}
/**
* A default implementation of the type class for a Format that specifies a serializer
*
* Create a Format object with the client actor as the implementation of the type class and
* a serializer object
*
* <pre>
* object BinaryFormatMyJavaSerializableActor {
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
* val serializer = Serializers.Java
* }
* }
* </pre>
*/
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable {
val serializer: Serializer
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T]
def toBinary(ac: T) = serializer.toBinary(ac)
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import akka.AkkaException
class RoutingException(message: String) extends AkkaException(message)
sealed trait RouterType
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RouterType {
object Direct extends RouterType
object Random extends RouterType
object RoundRobin extends RouterType
}
// FIXME move all routing in cluster here when we can

View file

@ -8,6 +8,7 @@ import akka.actor._
import akka.config.Config._ import akka.config.Config._
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.util.{ListenerManagement, ReflectiveAccess} import akka.util.{ListenerManagement, ReflectiveAccess}
import akka.serialization._
import akka.AkkaException import akka.AkkaException
/** /**
@ -89,6 +90,8 @@ object EventHandler extends ListenerManagement {
lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("event:handler").build lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("event:handler").build
implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener]
val level: Int = config.getString("akka.event-handler-level", "INFO") match { val level: Int = config.getString("akka.event-handler-level", "INFO") match {
case "ERROR" => ErrorLevel case "ERROR" => ErrorLevel
case "WARNING" => WarningLevel case "WARNING" => WarningLevel
@ -107,8 +110,7 @@ object EventHandler extends ListenerManagement {
defaultListeners foreach { listenerName => defaultListeners foreach { listenerName =>
try { try {
ReflectiveAccess.getClassFor[Actor](listenerName) map { clazz => ReflectiveAccess.getClassFor[Actor](listenerName) map { clazz =>
val listener = Actor.actorOf(clazz, listenerName).start() addListener(Actor.actorOf(clazz, listenerName).start())
addListener(listener)
} }
} catch { } catch {
case e: akka.actor.DeploymentAlreadyBoundException => // do nothing case e: akka.actor.DeploymentAlreadyBoundException => // do nothing

View file

@ -8,6 +8,7 @@ import akka.japi.Creator
import akka.actor._ import akka.actor._
import akka.util._ import akka.util._
import akka.dispatch.CompletableFuture import akka.dispatch.CompletableFuture
import akka.serialization._
import akka.AkkaException import akka.AkkaException
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
@ -137,7 +138,9 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
val eventHandler: ActorRef = { val eventHandler: ActorRef = {
val handler = Actor.actorOf[RemoteEventHandler](classOf[RemoteEventHandler].getName).start() implicit object format extends StatelessActorFormat[RemoteEventHandler]
val clazz = classOf[RemoteEventHandler]
val handler = Actor.actorOf(clazz, clazz.getName).start()
// add the remote client and server listener that pipes the events to the event handler system // add the remote client and server listener that pipes the events to the event handler system
addListener(handler) addListener(handler)
handler handler

View file

@ -12,6 +12,7 @@ import akka.remoteinterface.RemoteSupport
import akka.actor._ import akka.actor._
import akka.event.EventHandler import akka.event.EventHandler
import akka.actor.DeploymentConfig.Deploy import akka.actor.DeploymentConfig.Deploy
import akka.serialization.Format
/** /**
* Helper class for reflective access to different modules in order to allow optional loading of modules. * Helper class for reflective access to different modules in order to allow optional loading of modules.
@ -64,11 +65,19 @@ object ReflectiveAccess {
} }
type ClusterNode = { type ClusterNode = {
def nrOfActors: Int def store[T <: Actor]
def store[T <: Actor](address: String, actorRef: ActorRef) (address: String, actorClass: Class[T], replicas: Int, serializeMailbox: Boolean)
// (implicit format: Format[T]) (implicit format: Format[T])
def store[T <: Actor]
(address: String, actorRef: ActorRef, replicas: Int, serializeMailbox: Boolean)
(implicit format: Format[T])
def remove(address: String) def remove(address: String)
def use(address: String): Option[ActorRef] def use(address: String): Array[ActorRef]
def ref(address: String, router: RouterType): ActorRef
def isClustered(address: String): Boolean
def nrOfActors: Int
} }
type ClusterDeployer = { type ClusterDeployer = {

View file

@ -33,7 +33,7 @@ import akka.event.EventHandler
import akka.dispatch.{Dispatchers, Future} import akka.dispatch.{Dispatchers, Future}
import akka.remoteinterface._ import akka.remoteinterface._
import akka.config.Config._ import akka.config.Config._
import akka.serialization.{Format, Serializer} import akka.serialization.{Format, Serializers}
import akka.serialization.Compression.LZF import akka.serialization.Compression.LZF
import akka.AkkaException import akka.AkkaException
@ -413,7 +413,7 @@ class ClusterNode private[akka] (
val ACTOR_REGISTRY_NODE = CLUSTER_NODE + "/actor-registry" val ACTOR_REGISTRY_NODE = CLUSTER_NODE + "/actor-registry"
val ACTOR_LOCATIONS_NODE = CLUSTER_NODE + "/actor-locations" val ACTOR_LOCATIONS_NODE = CLUSTER_NODE + "/actor-locations"
val ACTOR_ADDRESS_TO_UUIDS_NODE = CLUSTER_NODE + "/actor-address-to-uuids" val ACTOR_ADDRESS_TO_UUIDS_NODE = CLUSTER_NODE + "/actor-address-to-uuids"
val ACTORS_AT_NODE_NODE = CLUSTER_NODE + "/actors-at-address" val ACTORS_AT_NODE_NODE = CLUSTER_NODE + "/actors-at-address"
val baseNodes = List( val baseNodes = List(
CLUSTER_NODE, CLUSTER_NODE,
MEMBERSHIP_NODE, MEMBERSHIP_NODE,
@ -852,7 +852,7 @@ class ClusterNode private[akka] (
/** /**
* Creates an ActorRef with a Router to a set of clustered actors. * Creates an ActorRef with a Router to a set of clustered actors.
*/ */
def ref(actorAddress: String, router: Router.RouterType): ActorRef = if (isConnected.isOn) { def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
val addresses = addressesForActor(actorAddress) val addresses = addressesForActor(actorAddress)
val actorType = ActorType.ScalaActor // FIXME later we also want to suppot TypedActor, then 'actorType' needs to be configurable val actorType = ActorType.ScalaActor // FIXME later we also want to suppot TypedActor, then 'actorType' needs to be configurable
@ -1031,7 +1031,7 @@ class ClusterNode private[akka] (
def send(f: Function0[Unit], replicationFactor: Int): Unit = { def send(f: Function0[Unit], replicationFactor: Int): Unit = {
val message = RemoteDaemonMessageProtocol.newBuilder val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_UNIT) .setMessageType(FUNCTION_FUN0_UNIT)
.setPayload(ByteString.copyFrom(Serializer.Java.toBinary(f))) .setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f)))
.build .build
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
} }
@ -1043,7 +1043,7 @@ class ClusterNode private[akka] (
def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]] = { def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]] = {
val message = RemoteDaemonMessageProtocol.newBuilder val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_ANY) .setMessageType(FUNCTION_FUN0_ANY)
.setPayload(ByteString.copyFrom(Serializer.Java.toBinary(f))) .setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f)))
.build .build
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ !!! message) val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ !!! message)
results.toList.asInstanceOf[List[Future[Any]]] results.toList.asInstanceOf[List[Future[Any]]]
@ -1056,7 +1056,7 @@ class ClusterNode private[akka] (
def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int): Unit = { def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int): Unit = {
val message = RemoteDaemonMessageProtocol.newBuilder val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_UNIT) .setMessageType(FUNCTION_FUN1_ARG_UNIT)
.setPayload(ByteString.copyFrom(Serializer.Java.toBinary((f, arg)))) .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg))))
.build .build
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
} }
@ -1069,7 +1069,7 @@ class ClusterNode private[akka] (
def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]] = { def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]] = {
val message = RemoteDaemonMessageProtocol.newBuilder val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_ANY) .setMessageType(FUNCTION_FUN1_ARG_ANY)
.setPayload(ByteString.copyFrom(Serializer.Java.toBinary((f, arg)))) .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg))))
.build .build
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ !!! message) val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ !!! message)
results.toList.asInstanceOf[List[Future[Any]]] results.toList.asInstanceOf[List[Future[Any]]]
@ -1290,7 +1290,7 @@ class ClusterNode private[akka] (
val to = remoteServerAddress val to = remoteServerAddress
val command = RemoteDaemonMessageProtocol.newBuilder val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FAIL_OVER_CONNECTIONS) .setMessageType(FAIL_OVER_CONNECTIONS)
.setPayload(ByteString.copyFrom(Serializer.Java.toBinary((from, to)))) .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((from, to))))
.build .build
membershipNodes foreach { node => membershipNodes foreach { node =>
replicaConnections.get(node) foreach { case (_, connection) => replicaConnections.get(node) foreach { case (_, connection) =>
@ -1587,6 +1587,6 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
} }
private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
Serializer.Java.fromBinary(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T] Serializers.Java.fromBinary(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T]
} }
} }

View file

@ -69,8 +69,9 @@ object ClusterDeployer {
private val systemDeployments = List( private val systemDeployments = List(
Deploy( Deploy(
RemoteClusterDaemon.ADDRESS, Direct, address = RemoteClusterDaemon.ADDRESS,
Clustered(Deployer.defaultAddress, NoReplicas, Stateless)) routing = Direct,
scope = Clustered(Deployer.defaultAddress, NoReplicas, Stateless))
) )
private[akka] def init(deployments: List[Deploy]) { private[akka] def init(deployments: List[Deploy]) {

View file

@ -7,6 +7,7 @@ import Cluster._
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import akka.actor.RouterType._
import akka.dispatch.Future import akka.dispatch.Future
import akka.AkkaException import akka.AkkaException
@ -20,30 +21,25 @@ class RoutingException(message: String) extends AkkaException(message)
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Router { object Router {
sealed trait RouterType
object Direct extends RouterType
object Random extends RouterType
object RoundRobin extends RouterType
def newRouter( def newRouter(
routerType: RouterType, routerType: RouterType,
addresses: Array[Tuple2[UUID, InetSocketAddress]], addresses: Array[Tuple2[UUID, InetSocketAddress]],
address: String, serviceId: String,
timeout: Long, timeout: Long,
actorType: ActorType, actorType: ActorType,
replicationStrategy: ReplicationStrategy = ReplicationStrategy.WriteThrough): ClusterActorRef = { replicationStrategy: ReplicationStrategy = ReplicationStrategy.WriteThrough): ClusterActorRef = {
routerType match { routerType match {
case Direct => new ClusterActorRef( case Direct => new ClusterActorRef(
addresses, address, timeout, addresses, serviceId, timeout,
actorType, replicationStrategy) with Direct actorType, replicationStrategy) with Direct
case Random => new ClusterActorRef( case Random => new ClusterActorRef(
addresses, address, timeout, addresses, serviceId, timeout,
actorType, replicationStrategy) with Random actorType, replicationStrategy) with Random
case RoundRobin => new ClusterActorRef( case RoundRobin => new ClusterActorRef(
addresses, address, timeout, addresses, serviceId, timeout,
actorType, replicationStrategy) with RoundRobin actorType, replicationStrategy) with RoundRobin
} }
} }

View file

@ -8,7 +8,7 @@ import org.I0Itec.zkclient._
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import akka.serialization.{Serializer, SerializerBasedActorFormat} import akka.serialization.{Serializers, SerializerBasedActorFormat}
import akka.util.Helpers._ import akka.util.Helpers._
import akka.actor.DeploymentConfig._ import akka.actor.DeploymentConfig._
@ -30,7 +30,7 @@ class MyJavaSerializableActor extends Actor with Serializable {
object BinaryFormatMyJavaSerializableActor { object BinaryFormatMyJavaSerializableActor {
implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] with Serializable { implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] with Serializable {
val serializer = Serializer.Java val serializer = Serializers.Java
} }
} }
@ -787,7 +787,7 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with
node2.stop node2.stop
node3.stop node3.stop
} }
*/
"be able to create a reference to a replicated actor by address using Router.RoundRobin routing" in { "be able to create a reference to a replicated actor by address using Router.RoundRobin routing" in {
// create actor // create actor
val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start
@ -830,6 +830,8 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with
node2.stop node2.stop
node3.stop node3.stop
} }
*/
} }
override def beforeAll() = { override def beforeAll() = {

View file

@ -8,7 +8,7 @@ import akka.cluster._
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import akka.serialization.{Serializer, SerializerBasedActorFormat} import akka.serialization.{Serializers, SerializerBasedActorFormat}
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
@ -67,15 +67,16 @@ object PingPong {
object BinaryFormats { object BinaryFormats {
implicit object PingActorFormat extends SerializerBasedActorFormat[PingActor] with Serializable { implicit object PingActorFormat extends SerializerBasedActorFormat[PingActor] with Serializable {
val serializer = Serializer.Java val serializer = Serializers.Java
} }
implicit object PongActorFormat extends SerializerBasedActorFormat[PongActor] with Serializable { implicit object PongActorFormat extends SerializerBasedActorFormat[PongActor] with Serializable {
val serializer = Serializer.Java val serializer = Serializers.Java
} }
} }
} }
/*
object ClusteredPingPongSample { object ClusteredPingPongSample {
import PingPong._ import PingPong._
import BinaryFormats._ import BinaryFormats._
@ -145,3 +146,4 @@ object ClusteredPingPongSample {
Cluster.shutdownLocalCluster() Cluster.shutdownLocalCluster()
} }
} }
*/

View file

@ -7,7 +7,7 @@ package example.cluster
import akka.cluster._ import akka.cluster._
import akka.actor._ import akka.actor._
import akka.serialization.{Serializer, SerializerBasedActorFormat} import akka.serialization.{Serializers, SerializerBasedActorFormat}
import akka.util.duration._ import akka.util.duration._
object PingPong { object PingPong {
@ -69,15 +69,16 @@ object PingPong {
object BinaryFormats { object BinaryFormats {
implicit object PingActorFormat extends SerializerBasedActorFormat[PingActor] with Serializable { implicit object PingActorFormat extends SerializerBasedActorFormat[PingActor] with Serializable {
val serializer = Serializer.Java val serializer = Serializers.Java
} }
implicit object PongActorFormat extends SerializerBasedActorFormat[PongActor] with Serializable { implicit object PongActorFormat extends SerializerBasedActorFormat[PongActor] with Serializable {
val serializer = Serializer.Java val serializer = Serializers.Java
} }
} }
} }
/*
object PingPongMultiJvmNode1 { object PingPongMultiJvmNode1 {
import PingPong._ import PingPong._
import BinaryFormats._ import BinaryFormats._
@ -238,4 +239,4 @@ class PongNode(number: Int) {
node.stop node.stop
} }
} }
*/

View file

@ -61,7 +61,7 @@ Step 2: Implement the type class for the actor
object BinaryFormatMyActor { object BinaryFormatMyActor {
implicit object MyActorFormat extends Format[MyActor] { implicit object MyActorFormat extends Format[MyActor] {
def fromBinary(bytes: Array[Byte], act: MyActor) = { def fromBinary(bytes: Array[Byte], act: MyActor) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount act.count = p.getCount
act act
} }
@ -170,7 +170,7 @@ Create a module for the type class ..
object BinaryFormatMyJavaSerializableActor { object BinaryFormatMyJavaSerializableActor {
implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] { implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
val serializer = Serializer.Java val serializer = Serializers.Java
} }
} }
@ -258,7 +258,7 @@ Step 2: Implement the type class for the actor
class MyTypedActorFormat extends Format[MyTypedActorImpl] { class MyTypedActorFormat extends Format[MyTypedActorImpl] {
def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = { def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount act.count = p.getCount
act act
} }
@ -316,7 +316,7 @@ If you are sending messages to a remote Actor and these messages implement one o
Each serialization interface/trait in Each serialization interface/trait in
* akka.serialization.Serializable.* * akka.serialization.Serializable.*
> has a matching serializer in > has a matching serializer in
* akka.serialization.Serializer.* * akka.serialization.Serializers.*
Note however that if you are using one of the Serializable interfaces then you dont have to do anything else in regard to sending remote messages. Note however that if you are using one of the Serializable interfaces then you dont have to do anything else in regard to sending remote messages.
@ -326,7 +326,7 @@ The ones currently supported are (besides the default which is regular Java seri
* SBinary (Scala only) * SBinary (Scala only)
* Protobuf (Scala and Java) * Protobuf (Scala and Java)
Apart from the above, Akka also supports Scala object serialization through `SJSON <http://github.com/debasishg/sjson/tree/master>`_ that implements APIs similar to 'akka.serialization.Serializer.*'. See the section on SJSON below for details. Apart from the above, Akka also supports Scala object serialization through `SJSON <http://github.com/debasishg/sjson/tree/master>`_ that implements APIs similar to 'akka.serialization.Serializers.*'. See the section on SJSON below for details.
Protobuf Protobuf
-------- --------
@ -400,7 +400,7 @@ For your POJOs to be able to serialize themselves you have to extend the ScalaJS
s.fromJSON(s.toJSON) should equal(s) s.fromJSON(s.toJSON) should equal(s)
} }
Use akka.serialization.Serializer.ScalaJSON to do generic JSON serialization, e.g. serialize object that does not extend ScalaJSON using the JSON serializer. Serialization using Serializer can be done in two ways :- Use akka.serialization.Serializers.ScalaJSON to do generic JSON serialization, e.g. serialize object that does not extend ScalaJSON using the JSON serializer. Serialization using Serializer can be done in two ways :-
1. Type class based serialization (recommended) 1. Type class based serialization (recommended)
2. Reflection based serialization 2. Reflection based serialization
@ -430,7 +430,7 @@ Here are the steps that you need to follow:
.. code-block:: scala .. code-block:: scala
import akka.serialization.Serializer.ScalaJSON import akka.serialization.Serializers.ScalaJSON
val o = MyMessage("dg", ("akka", 100)) val o = MyMessage("dg", ("akka", 100))
fromjson[MyMessage](tojson(o)) should equal(o) fromjson[MyMessage](tojson(o)) should equal(o)
@ -451,26 +451,26 @@ You can also use the Serializer abstraction to serialize using the reflection ba
} }
val foo = new Foo("bar") val foo = new Foo("bar")
val json = Serializer.ScalaJSON.out(foo) val json = Serializers.ScalaJSON.out(foo)
val fooCopy = Serializer.ScalaJSON.in(json) // returns a JsObject as an AnyRef val fooCopy = Serializers.ScalaJSON.in(json) // returns a JsObject as an AnyRef
val fooCopy2 = Serializer.ScalaJSON.in(new String(json)) // can also take a string as input val fooCopy2 = Serializers.ScalaJSON.in(new String(json)) // can also take a string as input
val fooCopy3 = Serializer.ScalaJSON.in[Foo](json).asInstanceOf[Foo] val fooCopy3 = Serializers.ScalaJSON.in[Foo](json).asInstanceOf[Foo]
Classes without a @BeanInfo annotation cannot be serialized as JSON. Classes without a @BeanInfo annotation cannot be serialized as JSON.
So if you see something like that: So if you see something like that:
.. code-block:: scala .. code-block:: scala
scala> Serializer.ScalaJSON.out(bar) scala> Serializers.ScalaJSON.out(bar)
Serializer.ScalaJSON.out(bar) Serializers.ScalaJSON.out(bar)
java.lang.UnsupportedOperationException: Class class Bar not supported for conversion java.lang.UnsupportedOperationException: Class class Bar not supported for conversion
at sjson.json.JsBean$class.toJSON(JsBean.scala:210) at sjson.json.JsBean$class.toJSON(JsBean.scala:210)
at sjson.json.Serializer$SJSON$.toJSON(Serializer.scala:107) at sjson.json.Serializer$SJSON$.toJSON(Serializers.scala:107)
at sjson.json.Serializer$SJSON$class.out(Serializer.scala:37) at sjson.json.Serializer$SJSON$class.out(Serializers.scala:37)
at sjson.json.Serializer$SJSON$.out(Serializer.scala:107) at sjson.json.Serializer$SJSON$.out(Serializers.scala:107)
at akka.serialization.Serializer$ScalaJSON... at akka.serialization.Serializer$ScalaJSON...
it means, that you haven't got a @BeanInfo annotation on your class. it means, that you haven't got a @BeanInfo annotation on your class.
@ -747,8 +747,8 @@ and the serialization code like the following:
object TestSerialize{ object TestSerialize{
def main(args: Array[String]) { def main(args: Array[String]) {
val test1 = new D(List(B("hello1"))) val test1 = new D(List(B("hello1")))
val json = sjson.json.Serializer.SJSON.out(test1) val json = sjson.json.Serializers.SJSON.out(test1)
val res = sjson.json.Serializer.SJSON.in[D](json) val res = sjson.json.Serializers.SJSON.in[D](json)
val res1: D = res.asInstanceOf[D] val res1: D = res.asInstanceOf[D]
println(res1) println(res1)
} }
@ -931,15 +931,15 @@ SBinary: Scala
To serialize Scala structures you can use SBinary serializer. SBinary can serialize all primitives and most default Scala datastructures; such as List, Tuple, Map, Set, BigInt etc. To serialize Scala structures you can use SBinary serializer. SBinary can serialize all primitives and most default Scala datastructures; such as List, Tuple, Map, Set, BigInt etc.
Here is an example of using the akka.serialization.Serializer.SBinary serializer to serialize standard Scala library objects. Here is an example of using the akka.serialization.Serializers.SBinary serializer to serialize standard Scala library objects.
.. code-block:: scala .. code-block:: scala
import akka.serialization.Serializer import akka.serialization.Serializer
import sbinary.DefaultProtocol._ // you always need to import these implicits import sbinary.DefaultProtocol._ // you always need to import these implicits
val users = List(("user1", "passwd1"), ("user2", "passwd2"), ("user3", "passwd3")) val users = List(("user1", "passwd1"), ("user2", "passwd2"), ("user3", "passwd3"))
val bytes = Serializer.SBinary.out(users) val bytes = Serializers.SBinary.out(users)
val usersCopy = Serializer.SBinary.in(bytes, Some(classOf[List[Tuple2[String,String]]])) val usersCopy = Serializers.SBinary.in(bytes, Some(classOf[List[Tuple2[String,String]]]))
If you need to serialize your own user-defined objects then you have to do three things: If you need to serialize your own user-defined objects then you have to do three things:
# Define an empty constructor # Define an empty constructor

View file

@ -1 +0,0 @@
se.scalablesolutions.akka.rest.ListWriter

View file

@ -4,17 +4,17 @@
package akka.remote package akka.remote
import akka.serialization.{Serializer, Serializable} import akka.serialization.{Serializers, Serializable}
import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol._
import akka.util._ import akka.util._
import com.google.protobuf.{Message, ByteString} import com.google.protobuf.{Message, ByteString}
object MessageSerializer { object MessageSerializer {
private def SERIALIZER_JAVA: Serializer.Java = Serializer.Java private def SERIALIZER_JAVA: Serializers.Java = Serializers.Java
private def SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON private def SERIALIZER_JAVA_JSON: Serializers.JavaJSON = Serializers.JavaJSON
private def SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON private def SERIALIZER_SCALA_JSON: Serializers.ScalaJSON = Serializers.ScalaJSON
private def SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf private def SERIALIZER_PROTOBUF: Serializers.Protobuf = Serializers.Protobuf
def setClassLoader(cl: ClassLoader) = { def setClassLoader(cl: ClassLoader) = {
val someCl = Some(cl) val someCl = Some(cl)

View file

@ -19,58 +19,6 @@ import akka.util.ReflectiveAccess
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.remote. {RemoteClientSettings, MessageSerializer} import akka.remote. {RemoteClientSettings, MessageSerializer}
/**
* Type class definition for Actor Serialization
*/
trait FromBinary[T <: Actor] {
def fromBinary(bytes: Array[Byte], act: T): T
}
trait ToBinary[T <: Actor] {
def toBinary(t: T): Array[Byte]
}
// client needs to implement Format[] for the respective actor
trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
/**
* A default implementation for a stateless actor
*
* Create a Format object with the client actor as the implementation of the type class
*
* <pre>
* object BinaryFormatMyStatelessActor {
* implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
* }
* </pre>
*/
trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable {
def fromBinary(bytes: Array[Byte], act: T) = act
def toBinary(ac: T) = Array.empty[Byte]
}
/**
* A default implementation of the type class for a Format that specifies a serializer
*
* Create a Format object with the client actor as the implementation of the type class and
* a serializer object
*
* <pre>
* object BinaryFormatMyJavaSerializableActor {
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
* val serializer = Serializer.Java
* }
* }
* </pre>
*/
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable {
val serializer: Serializer
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T]
def toBinary(ac: T) = serializer.toBinary(ac)
}
/** /**
* Module for local actor serialization. * Module for local actor serialization.
@ -140,7 +88,7 @@ object ActorSerialization {
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
lifeCycleProtocol.foreach(builder.setLifeCycle(_)) lifeCycleProtocol.foreach(builder.setLifeCycle(_))
actorRef.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s))) actorRef.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(Serializer.Java.toBinary(actorRef.hotswap))) if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(Serializers.Java.toBinary(actorRef.hotswap)))
builder.build builder.build
} }

View file

@ -14,20 +14,9 @@ import org.codehaus.jackson.map.ObjectMapper
import sjson.json.{Serializer => SJSONSerializer} import sjson.json.{Serializer => SJSONSerializer}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Serializer extends scala.Serializable {
@volatile var classLoader: Option[ClassLoader] = None
def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass))
def toBinary(obj: AnyRef): Array[Byte]
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
// For Java API // For Java API
class SerializerFactory { class SerializerFactory {
import Serializer._ import Serializers._
def getJava: Java.type = Java def getJava: Java.type = Java
def getJavaJSON: JavaJSON.type = JavaJSON def getJavaJSON: JavaJSON.type = JavaJSON
def getScalaJSON: ScalaJSON.type = ScalaJSON def getScalaJSON: ScalaJSON.type = ScalaJSON
@ -37,7 +26,7 @@ class SerializerFactory {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Serializer { object Serializers {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
object NOOP extends NOOP object NOOP extends NOOP

View file

@ -6,7 +6,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith import org.junit.runner.RunWith
import akka.serialization.Serializer.ScalaJSON import akka.serialization.Serializers.ScalaJSON
//TODO: FIXME WHY IS THIS COMMENTED OUT? //TODO: FIXME WHY IS THIS COMMENTED OUT?
object Protocols { object Protocols {

View file

@ -22,7 +22,7 @@ class SerializableTypeClassActorSpec extends
object BinaryFormatMyActor { object BinaryFormatMyActor {
implicit object MyActorFormat extends Format[MyActor] { implicit object MyActorFormat extends Format[MyActor] {
def fromBinary(bytes: Array[Byte], act: MyActor) = { def fromBinary(bytes: Array[Byte], act: MyActor) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount act.count = p.getCount
act act
} }
@ -34,7 +34,7 @@ class SerializableTypeClassActorSpec extends
object BinaryFormatMyActorWithDualCounter { object BinaryFormatMyActorWithDualCounter {
implicit object MyActorWithDualCounterFormat extends Format[MyActorWithDualCounter] { implicit object MyActorWithDualCounterFormat extends Format[MyActorWithDualCounter] {
def fromBinary(bytes: Array[Byte], act: MyActorWithDualCounter) = { def fromBinary(bytes: Array[Byte], act: MyActorWithDualCounter) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter]
act.count1 = p.getCount1 act.count1 = p.getCount1
act.count2 = p.getCount2 act.count2 = p.getCount2
act act
@ -58,7 +58,7 @@ class SerializableTypeClassActorSpec extends
object BinaryFormatMyJavaSerializableActor { object BinaryFormatMyJavaSerializableActor {
implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] { implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
val serializer = Serializer.Java val serializer = Serializers.Java
} }
} }

View file

@ -20,20 +20,20 @@ class SerializerSpec extends JUnitSuite {
@Test @Test
def shouldSerializeString = { def shouldSerializeString = {
val f = Foo("debasish") val f = Foo("debasish")
val json = Serializer.ScalaJSON.toBinary(f) val json = Serializers.ScalaJSON.toBinary(f)
assert(new String(json) == """{"foo":"debasish"}""") assert(new String(json) == """{"foo":"debasish"}""")
val fo = Serializer.ScalaJSON.fromJSON[Foo](new String(json)).asInstanceOf[Foo] val fo = Serializers.ScalaJSON.fromJSON[Foo](new String(json)).asInstanceOf[Foo]
assert(fo == f) assert(fo == f)
} }
@Test @Test
def shouldSerializeTuple2 = { def shouldSerializeTuple2 = {
val message = MyMessage("id", ("hello", 34)) val message = MyMessage("id", ("hello", 34))
val json = Serializer.ScalaJSON.toBinary(message) val json = Serializers.ScalaJSON.toBinary(message)
assert(new String(json) == """{"id":"id","value":{"hello":34}}""") assert(new String(json) == """{"id":"id","value":{"hello":34}}""")
val f = Serializer.ScalaJSON.fromJSON[MyMessage](new String(json)).asInstanceOf[MyMessage] val f = Serializers.ScalaJSON.fromJSON[MyMessage](new String(json)).asInstanceOf[MyMessage]
assert(f == message) assert(f == message)
val g = Serializer.ScalaJSON.fromBinary[MyMessage](json).asInstanceOf[MyMessage] val g = Serializers.ScalaJSON.fromBinary[MyMessage](json).asInstanceOf[MyMessage]
assert(f == message) assert(f == message)
} }
} }

View file

@ -21,7 +21,7 @@ class Ticket435Spec extends
object BinaryFormatMyStatefulActor { object BinaryFormatMyStatefulActor {
implicit object MyStatefulActorFormat extends Format[MyStatefulActor] { implicit object MyStatefulActorFormat extends Format[MyStatefulActor] {
def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = { def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount act.count = p.getCount
act act
} }

View file

@ -18,7 +18,7 @@ class TypedActorSerializationSpec extends AkkaRemoteTest {
class MyTypedActorFormat extends Format[MyTypedActorImpl] { class MyTypedActorFormat extends Format[MyTypedActorImpl] {
def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = { def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount act.count = p.getCount
act act
} }
@ -28,7 +28,7 @@ class TypedActorSerializationSpec extends AkkaRemoteTest {
class MyTypedActorWithDualCounterFormat extends Format[MyTypedActorWithDualCounter] { class MyTypedActorWithDualCounterFormat extends Format[MyTypedActorWithDualCounter] {
def fromBinary(bytes: Array[Byte], act: MyTypedActorWithDualCounter) = { def fromBinary(bytes: Array[Byte], act: MyTypedActorWithDualCounter) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter]
act.count1 = p.getCount1 act.count1 = p.getCount1
act.count2 = p.getCount2 act.count2 = p.getCount2
act act

View file

@ -20,7 +20,7 @@ class UntypedActorSerializationSpec extends
class MyUntypedActorFormat extends Format[MyUntypedActor] { class MyUntypedActorFormat extends Format[MyUntypedActor] {
def fromBinary(bytes: Array[Byte], act: MyUntypedActor) = { def fromBinary(bytes: Array[Byte], act: MyUntypedActor) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount act.count = p.getCount
act act
} }
@ -30,7 +30,7 @@ class UntypedActorSerializationSpec extends
class MyUntypedActorWithDualCounterFormat extends Format[MyUntypedActorWithDualCounter] { class MyUntypedActorWithDualCounterFormat extends Format[MyUntypedActorWithDualCounter] {
def fromBinary(bytes: Array[Byte], act: MyUntypedActorWithDualCounter) = { def fromBinary(bytes: Array[Byte], act: MyUntypedActorWithDualCounter) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter]
act.count1 = p.getCount1 act.count1 = p.getCount1
act.count2 = p.getCount2 act.count2 = p.getCount2
act act

View file

@ -141,7 +141,7 @@ public class Pi {
// create the workers // create the workers
final ActorRef[] workers = new ActorRef[nrOfWorkers]; final ActorRef[] workers = new ActorRef[nrOfWorkers];
for (int i = 0; i < nrOfWorkers; i++) { for (int i = 0; i < nrOfWorkers; i++) {
workers[i] = actorOf(Worker.class).start(); workers[i] = actorOf(Worker.class, "worker").start();
} }
// wrap them with a load-balancing router // wrap them with a load-balancing router
@ -149,7 +149,7 @@ public class Pi {
public UntypedActor create() { public UntypedActor create() {
return new PiRouter(workers); return new PiRouter(workers);
} }
}).start(); }, "router").start();
} }
// message handler // message handler
@ -207,7 +207,7 @@ public class Pi {
public UntypedActor create() { public UntypedActor create() {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
} }
}).start(); }, "master").start();
// start the calculation // start the calculation
master.sendOneWay(new Calculate()); master.sendOneWay(new Calculate());

View file

@ -134,7 +134,7 @@ public class Pi {
// create the workers // create the workers
final ActorRef[] workers = new ActorRef[nrOfWorkers]; final ActorRef[] workers = new ActorRef[nrOfWorkers];
for (int i = 0; i < nrOfWorkers; i++) { for (int i = 0; i < nrOfWorkers; i++) {
workers[i] = actorOf(Worker.class).start(); workers[i] = actorOf(Worker.class, "worker").start();
} }
// wrap them with a load-balancing router // wrap them with a load-balancing router
@ -142,7 +142,7 @@ public class Pi {
public UntypedActor create() { public UntypedActor create() {
return new PiRouter(workers); return new PiRouter(workers);
} }
}).start(); }, "router").start();
} }
@Override @Override
@ -202,7 +202,7 @@ public class Pi {
public UntypedActor create() { public UntypedActor create() {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements); return new Master(nrOfWorkers, nrOfMessages, nrOfElements);
} }
}).start(); }, "worker").start();
// start the calculation // start the calculation
long start = currentTimeMillis(); long start = currentTimeMillis();

View file

@ -272,7 +272,7 @@ abstract class TypedActor extends Actor with Proxyable {
if (!unserializable && hasMutableArgument) { if (!unserializable && hasMutableArgument) {
//FIXME serializeArguments //FIXME serializeArguments
// val copyOfArgs = Serializer.Java.deepClone(args) // val copyOfArgs = Serializers.Java.deepClone(args)
// joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]]) // joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
joinPoint joinPoint
} }

View file

@ -45,6 +45,7 @@ akka {
# available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages" # available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages"
# or: fully qualified class name of the router class # or: fully qualified class name of the router class
# default is "direct"; # default is "direct";
format = "akka.serializer.Format$Default$"
clustered { # makes the actor available in the cluster registry clustered { # makes the actor available in the cluster registry
# default (if omitted) is local non-clustered actor # default (if omitted) is local non-clustered actor
home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor