Stateless and Stateful Actor serialization + Turned on class caching in Active Object

This commit is contained in:
Jonas Bonér 2010-06-18 13:31:26 +02:00
commit 35ae27780d
7 changed files with 142 additions and 38 deletions

View file

@ -354,7 +354,7 @@ object ActiveObject extends Logging {
}
private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = Proxy.newInstance(target, false, false)
val proxy = Proxy.newInstance(target, true, false)
val context = injectActiveObjectContext(proxy)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context)
actorRef.timeout = timeout
@ -367,7 +367,7 @@ object ActiveObject extends Logging {
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
val proxy = Proxy.newInstance(Array(intf), Array(target), true, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
@ -461,7 +461,7 @@ object ActiveObject extends Logging {
val parent = clazz.getSuperclass
if (parent != null) injectActiveObjectContext0(activeObject, parent)
else {
log.warning(
log.trace(
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
activeObject.getClass.getName)
None

View file

@ -34,27 +34,59 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
self.makeRemote(hostname, port)
}
/**
* Base trait defining a serializable actor.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait SerializableActor extends Actor
/**
* Base trait defining a stateless serializable actor.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait StatelessSerializableActor extends SerializableActor
/**
* Mix in this trait to create a serializable actor, serializable through
* a custom serialization protocol.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait SerializableActor extends Actor {
val serializer: Serializer
trait StatefulSerializableActor extends SerializableActor {
def toBinary: Array[Byte]
}
/**
* Mix in this trait to create a serializable actor, serializable through
* a custom serialization protocol. This actor <b>is</b> the serialized state.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait StatefulSerializerSerializableActor extends StatefulSerializableActor {
val serializer: Serializer
}
/**
* Mix in this trait to create a serializable actor, serializable through
* a custom serialization protocol. This actor <b>is wrapping</b> serializable state.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait StatefulWrappedSerializableActor extends StatefulSerializableActor {
def fromBinary(bytes: Array[Byte])
}
/**
* Mix in this trait to create a serializable actor, serializable through
* Protobuf.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ProtobufSerializableActor[T <: Message] extends SerializableActor {
val serializer = Serializer.Protobuf
trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor {
def toBinary: Array[Byte] = toProtobuf.toByteArray
def fromBinary(bytes: Array[Byte]) = fromProtobuf(serializer.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
val clazz: Class[T]
def toProtobuf: T
@ -67,7 +99,7 @@ trait ProtobufSerializableActor[T <: Message] extends SerializableActor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait JavaSerializableActor extends SerializableActor {
trait JavaSerializableActor extends StatefulSerializerSerializableActor {
@transient val serializer = Serializer.Java
def toBinary: Array[Byte] = serializer.toBinary(this)
}
@ -78,7 +110,7 @@ trait JavaSerializableActor extends SerializableActor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait JavaJSONSerializableActor extends SerializableActor {
trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor {
val serializer = Serializer.JavaJSON
def toBinary: Array[Byte] = serializer.toBinary(this)
}
@ -89,7 +121,7 @@ trait JavaJSONSerializableActor extends SerializableActor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ScalaJSONSerializableActor extends SerializableActor {
trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor {
val serializer = Serializer.ScalaJSON
def toBinary: Array[Byte] = serializer.toBinary(this)
}

View file

@ -74,7 +74,8 @@ object ActorRef {
/**
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol)
RemoteActorRef(
protocol.getUuid,
protocol.getActorClassname,
@ -82,6 +83,7 @@ object ActorRef {
protocol.getHomeAddress.getPort,
protocol.getTimeout,
loader)
}
/**
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
@ -99,11 +101,15 @@ object ActorRef {
* Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance.
*/
private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
val serializerClass =
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
else Class.forName(protocol.getSerializerClassname)
val serializer = serializerClass.newInstance.asInstanceOf[Serializer]
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
val serializer = if (protocol.hasSerializerClassname) {
val serializerClass =
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
else Class.forName(protocol.getSerializerClassname)
Some(serializerClass.newInstance.asInstanceOf[Serializer])
} else None
val lifeCycle =
if (protocol.hasLifeCycle) {
val lifeCycleProtocol = protocol.getLifeCycle
@ -120,8 +126,9 @@ object ActorRef {
if (protocol.hasSupervisor)
Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
val hotswap =
if (protocol.hasHotswapStack) Some(serializer
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
.asInstanceOf[PartialFunction[Any, Unit]])
else None
@ -339,10 +346,12 @@ trait ActorRef extends TransactionManagement {
/**
* Returns the 'Serializer' instance for the Actor as an Option.
* <p/>
* It returns 'Some(serializer)' if the Actor is serializable and 'None' if not.
* It returns 'Some(serializer)' if the Actor is extending the StatefulSerializerSerializableActor
* trait (which has a Serializer defined) and 'None' if not.
*/
def serializer: Option[Serializer] =
if (isSerializable) Some(actor.asInstanceOf[SerializableActor].serializer)
if (actor.isInstanceOf[StatefulSerializerSerializableActor])
Some(actor.asInstanceOf[StatefulSerializerSerializableActor].serializer)
else None
/**
@ -694,15 +703,25 @@ sealed class LocalActorRef private[akka](
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
__loader: ClassLoader,
__serializer: Serializer) = {
__serializer: Option[Serializer]) = {
this(() => {
val actorClass = __loader.loadClass(__actorClassName)
val actorInstance = actorClass.newInstance
if (actorInstance.isInstanceOf[ProtobufSerializableActor[_]]) {
val instance = actorInstance.asInstanceOf[ProtobufSerializableActor[_]]
if (actorInstance.isInstanceOf[StatelessSerializableActor]) {
actorInstance.asInstanceOf[Actor]
} else if (actorInstance.isInstanceOf[StatefulSerializerSerializableActor]) {
__serializer
.getOrElse(throw new IllegalStateException("No serializer defined for SerializableActor [" + actorClass.getName + "]"))
.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
} else if (actorInstance.isInstanceOf[StatefulWrappedSerializableActor]) {
val instance = actorInstance.asInstanceOf[StatefulWrappedSerializableActor]
instance.fromBinary(__actorBytes)
instance
} else __serializer.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
} else throw new IllegalStateException(
"Can't deserialize Actor that is not an instance of one of:\n" +
"\n\t- StatelessSerializableActor" +
"\n\t- StatefulSerializerSerializableActor" +
"\n\t- StatefulWrappedSerializableActor")
})
loader = Some(__loader)
isDeserialized = true
@ -761,7 +780,8 @@ sealed class LocalActorRef private[akka](
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
if (!isSerializable) throw new IllegalStateException(
"Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
"Can't serialize an ActorRef using SerializedActorRefProtocol" +
"\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
@ -782,23 +802,19 @@ sealed class LocalActorRef private[akka](
}
}
val serializerClassname = serializer
.getOrElse(throw new IllegalStateException("Can't serialize Actor [" + toString + "] - no 'Serializer' defined"))
.getClass.getName
val originalAddress = AddressProtocol.newBuilder.setHostname(homeAddress.getHostName).setPort(homeAddress.getPort).build
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(uuid)
.setId(id)
.setActorClassname(actorClass.getName)
.setActorInstance(ByteString.copyFrom(actor.asInstanceOf[SerializableActor].toBinary))
.setSerializerClassname(serializerClassname)
.setActorInstance(ByteString.copyFrom(actor.asInstanceOf[StatefulSerializableActor].toBinary))
.setOriginalAddress(originalAddress)
.setIsTransactor(isTransactor)
.setTimeout(timeout)
serializer.foreach(s => builder.setSerializerClassname(s.getClass.getName))
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
supervisor.foreach(sup => builder.setSupervisor(sup.toRemoteActorRefProtocol))
supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol))
// FIXME: how to serialize the hotswap PartialFunction ??
// hotswap.foreach(builder.setHotswapStack(_))
builder.build
@ -813,8 +829,10 @@ sealed class LocalActorRef private[akka](
* Serializes the ActorRef instance into a byte array (Array[Byte]).
*/
def toBinary: Array[Byte] = {
if (isSerializable) toSerializedActorRefProtocol.toByteArray
else toRemoteActorRefProtocol.toByteArray
val protocol = if (isSerializable) toSerializedActorRefProtocol
else toRemoteActorRefProtocol
Actor.log.debug("Serializing ActorRef to binary:\n" + protocol)
protocol.toByteArray
}
/**

View file

@ -20,7 +20,6 @@ class SerializableActorSpec extends
describe("SerializableActor") {
it("should be able to serialize and deserialize a JavaSerializableActor") {
val actor1 = actorOf[JavaSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
val bytes = actor1.toBinary
@ -30,9 +29,9 @@ class SerializableActorSpec extends
(actor2 !! "hello").getOrElse("_") should equal("world 2")
}
/*
it("should be able to serialize and deserialize a ProtobufSerializableActor") {
val actor1 = actorOf[ProtobufSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
(actor1 !! "hello").getOrElse("_") should equal("world 2")
@ -43,8 +42,6 @@ class SerializableActorSpec extends
(actor2 !! "hello").getOrElse("_") should equal("world 3")
}
/*
it("should be able to serialize and deserialize a JavaJSONSerializableActor") {
val actor1 = actorOf[JavaJSONSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))

View file

@ -0,0 +1,6 @@
project.name=Akka Plugin
project.organization=se.scalablesolutions.akka
# mirrors akka version
project.version=0.9.1
sbt.version=0.7.4
build.scala.versions=2.7.7

View file

@ -0,0 +1,3 @@
import sbt._
class AkkaPluginProject(info: ProjectInfo) extends PluginProject(info)

View file

@ -0,0 +1,48 @@
import sbt._
object AkkaRepositories {
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
}
trait AkkaBaseProject extends BasicScalaProject {
import AkkaRepositories._
// Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases)
// is resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo)
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo)
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", AkkaRepo)
val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo)
val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo)
val sjsonModuleConfig = ModuleConfiguration("sjson.json", AkkaRepo)
val voldemortModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo)
val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo)
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
}
trait AkkaProject extends AkkaBaseProject {
val akkaVersion = "0.9.1"
// convenience method
def akkaModule(module: String) = "se.scalablesolutions.akka" %% ("akka-" + module) % akkaVersion
// akka core dependency by default
val akkaCore = akkaModule("core")
}