Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
a569913af1
6 changed files with 102 additions and 54 deletions
|
|
@ -355,7 +355,7 @@ object ActiveObject extends Logging {
|
|||
}
|
||||
|
||||
private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
|
||||
val proxy = Proxy.newInstance(target, false, false)
|
||||
val proxy = Proxy.newInstance(target, true, false)
|
||||
val context = injectActiveObjectContext(proxy)
|
||||
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context)
|
||||
actorRef.timeout = timeout
|
||||
|
|
@ -368,7 +368,7 @@ object ActiveObject extends Logging {
|
|||
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
|
||||
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
|
||||
val context = injectActiveObjectContext(target)
|
||||
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
|
||||
val proxy = Proxy.newInstance(Array(intf), Array(target), true, false)
|
||||
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context)
|
||||
actorRef.timeout = timeout
|
||||
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
|
||||
|
|
@ -462,7 +462,7 @@ object ActiveObject extends Logging {
|
|||
val parent = clazz.getSuperclass
|
||||
if (parent != null) injectActiveObjectContext0(activeObject, parent)
|
||||
else {
|
||||
log.warning(
|
||||
log.trace(
|
||||
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
|
||||
activeObject.getClass.getName)
|
||||
None
|
||||
|
|
|
|||
|
|
@ -35,27 +35,59 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
|
|||
self.makeRemote(hostname, port)
|
||||
}
|
||||
|
||||
/**
|
||||
* Base trait defining a serializable actor.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait SerializableActor extends Actor
|
||||
|
||||
/**
|
||||
* Base trait defining a stateless serializable actor.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait StatelessSerializableActor extends SerializableActor
|
||||
|
||||
/**
|
||||
* Mix in this trait to create a serializable actor, serializable through
|
||||
* a custom serialization protocol.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait SerializableActor extends Actor {
|
||||
val serializer: Serializer
|
||||
trait StatefulSerializableActor extends SerializableActor {
|
||||
def toBinary: Array[Byte]
|
||||
}
|
||||
|
||||
/**
|
||||
* Mix in this trait to create a serializable actor, serializable through
|
||||
* a custom serialization protocol. This actor <b>is</b> the serialized state.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait StatefulSerializerSerializableActor extends StatefulSerializableActor {
|
||||
val serializer: Serializer
|
||||
}
|
||||
|
||||
/**
|
||||
* Mix in this trait to create a serializable actor, serializable through
|
||||
* a custom serialization protocol. This actor <b>is wrapping</b> serializable state.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait StatefulWrappedSerializableActor extends StatefulSerializableActor {
|
||||
def fromBinary(bytes: Array[Byte])
|
||||
}
|
||||
|
||||
/**
|
||||
* Mix in this trait to create a serializable actor, serializable through
|
||||
* Protobuf.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait ProtobufSerializableActor[T <: Message] extends SerializableActor {
|
||||
val serializer = Serializer.Protobuf
|
||||
trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor {
|
||||
def toBinary: Array[Byte] = toProtobuf.toByteArray
|
||||
def fromBinary(bytes: Array[Byte]) = fromProtobuf(serializer.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
|
||||
def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
|
||||
|
||||
val clazz: Class[T]
|
||||
def toProtobuf: T
|
||||
|
|
@ -68,7 +100,7 @@ trait ProtobufSerializableActor[T <: Message] extends SerializableActor {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait JavaSerializableActor extends SerializableActor {
|
||||
trait JavaSerializableActor extends StatefulSerializerSerializableActor {
|
||||
@transient val serializer = Serializer.Java
|
||||
def toBinary: Array[Byte] = serializer.toBinary(this)
|
||||
}
|
||||
|
|
@ -79,7 +111,7 @@ trait JavaSerializableActor extends SerializableActor {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait JavaJSONSerializableActor extends SerializableActor {
|
||||
trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor {
|
||||
val serializer = Serializer.JavaJSON
|
||||
def toBinary: Array[Byte] = serializer.toBinary(this)
|
||||
}
|
||||
|
|
@ -90,7 +122,7 @@ trait JavaJSONSerializableActor extends SerializableActor {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait ScalaJSONSerializableActor extends SerializableActor {
|
||||
trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor {
|
||||
val serializer = Serializer.ScalaJSON
|
||||
def toBinary: Array[Byte] = serializer.toBinary(this)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,7 +74,8 @@ object ActorRef {
|
|||
/**
|
||||
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
|
||||
*/
|
||||
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
|
||||
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
|
||||
Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol)
|
||||
RemoteActorRef(
|
||||
protocol.getUuid,
|
||||
protocol.getActorClassname,
|
||||
|
|
@ -82,6 +83,7 @@ object ActorRef {
|
|||
protocol.getHomeAddress.getPort,
|
||||
protocol.getTimeout,
|
||||
loader)
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
|
||||
|
|
@ -99,11 +101,15 @@ object ActorRef {
|
|||
* Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance.
|
||||
*/
|
||||
private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
|
||||
val serializerClass =
|
||||
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
|
||||
else Class.forName(protocol.getSerializerClassname)
|
||||
val serializer = serializerClass.newInstance.asInstanceOf[Serializer]
|
||||
|
||||
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
|
||||
|
||||
val serializer = if (protocol.hasSerializerClassname) {
|
||||
val serializerClass =
|
||||
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
|
||||
else Class.forName(protocol.getSerializerClassname)
|
||||
Some(serializerClass.newInstance.asInstanceOf[Serializer])
|
||||
} else None
|
||||
|
||||
val lifeCycle =
|
||||
if (protocol.hasLifeCycle) {
|
||||
val lifeCycleProtocol = protocol.getLifeCycle
|
||||
|
|
@ -120,8 +126,9 @@ object ActorRef {
|
|||
if (protocol.hasSupervisor)
|
||||
Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
||||
else None
|
||||
|
||||
val hotswap =
|
||||
if (protocol.hasHotswapStack) Some(serializer
|
||||
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
|
||||
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
|
||||
.asInstanceOf[PartialFunction[Any, Unit]])
|
||||
else None
|
||||
|
|
@ -349,10 +356,12 @@ trait ActorRef extends TransactionManagement {
|
|||
/**
|
||||
* Returns the 'Serializer' instance for the Actor as an Option.
|
||||
* <p/>
|
||||
* It returns 'Some(serializer)' if the Actor is serializable and 'None' if not.
|
||||
* It returns 'Some(serializer)' if the Actor is extending the StatefulSerializerSerializableActor
|
||||
* trait (which has a Serializer defined) and 'None' if not.
|
||||
*/
|
||||
def serializer: Option[Serializer] =
|
||||
if (isSerializable) Some(actor.asInstanceOf[SerializableActor].serializer)
|
||||
if (actor.isInstanceOf[StatefulSerializerSerializableActor])
|
||||
Some(actor.asInstanceOf[StatefulSerializerSerializableActor].serializer)
|
||||
else None
|
||||
|
||||
/**
|
||||
|
|
@ -710,15 +719,25 @@ sealed class LocalActorRef private[akka](
|
|||
__supervisor: Option[ActorRef],
|
||||
__hotswap: Option[PartialFunction[Any, Unit]],
|
||||
__loader: ClassLoader,
|
||||
__serializer: Serializer) = {
|
||||
__serializer: Option[Serializer]) = {
|
||||
this(() => {
|
||||
val actorClass = __loader.loadClass(__actorClassName)
|
||||
val actorInstance = actorClass.newInstance
|
||||
if (actorInstance.isInstanceOf[ProtobufSerializableActor[_]]) {
|
||||
val instance = actorInstance.asInstanceOf[ProtobufSerializableActor[_]]
|
||||
if (actorInstance.isInstanceOf[StatelessSerializableActor]) {
|
||||
actorInstance.asInstanceOf[Actor]
|
||||
} else if (actorInstance.isInstanceOf[StatefulSerializerSerializableActor]) {
|
||||
__serializer
|
||||
.getOrElse(throw new IllegalStateException("No serializer defined for SerializableActor [" + actorClass.getName + "]"))
|
||||
.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
|
||||
} else if (actorInstance.isInstanceOf[StatefulWrappedSerializableActor]) {
|
||||
val instance = actorInstance.asInstanceOf[StatefulWrappedSerializableActor]
|
||||
instance.fromBinary(__actorBytes)
|
||||
instance
|
||||
} else __serializer.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
|
||||
} else throw new IllegalStateException(
|
||||
"Can't deserialize Actor that is not an instance of one of:\n" +
|
||||
"\n\t- StatelessSerializableActor" +
|
||||
"\n\t- StatefulSerializerSerializableActor" +
|
||||
"\n\t- StatefulWrappedSerializableActor")
|
||||
})
|
||||
loader = Some(__loader)
|
||||
isDeserialized = true
|
||||
|
|
@ -777,7 +796,8 @@ sealed class LocalActorRef private[akka](
|
|||
|
||||
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
|
||||
if (!isSerializable) throw new IllegalStateException(
|
||||
"Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
|
||||
"Can't serialize an ActorRef using SerializedActorRefProtocol" +
|
||||
"\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
|
||||
|
||||
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
|
||||
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
|
||||
|
|
@ -798,23 +818,19 @@ sealed class LocalActorRef private[akka](
|
|||
}
|
||||
}
|
||||
|
||||
val serializerClassname = serializer
|
||||
.getOrElse(throw new IllegalStateException("Can't serialize Actor [" + toString + "] - no 'Serializer' defined"))
|
||||
.getClass.getName
|
||||
val originalAddress = AddressProtocol.newBuilder.setHostname(homeAddress.getHostName).setPort(homeAddress.getPort).build
|
||||
|
||||
val builder = SerializedActorRefProtocol.newBuilder
|
||||
.setUuid(uuid)
|
||||
.setId(id)
|
||||
.setActorClassname(actorClass.getName)
|
||||
.setActorInstance(ByteString.copyFrom(actor.asInstanceOf[SerializableActor].toBinary))
|
||||
.setSerializerClassname(serializerClassname)
|
||||
.setActorInstance(ByteString.copyFrom(actor.asInstanceOf[StatefulSerializableActor].toBinary))
|
||||
.setOriginalAddress(originalAddress)
|
||||
.setIsTransactor(isTransactor)
|
||||
.setTimeout(timeout)
|
||||
|
||||
serializer.foreach(s => builder.setSerializerClassname(s.getClass.getName))
|
||||
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
|
||||
supervisor.foreach(sup => builder.setSupervisor(sup.toRemoteActorRefProtocol))
|
||||
supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol))
|
||||
// FIXME: how to serialize the hotswap PartialFunction ??
|
||||
// hotswap.foreach(builder.setHotswapStack(_))
|
||||
builder.build
|
||||
|
|
@ -829,8 +845,10 @@ sealed class LocalActorRef private[akka](
|
|||
* Serializes the ActorRef instance into a byte array (Array[Byte]).
|
||||
*/
|
||||
def toBinary: Array[Byte] = {
|
||||
if (isSerializable) toSerializedActorRefProtocol.toByteArray
|
||||
else toRemoteActorRefProtocol.toByteArray
|
||||
val protocol = if (isSerializable) toSerializedActorRefProtocol
|
||||
else toRemoteActorRefProtocol
|
||||
Actor.log.debug("Serializing ActorRef to binary:\n" + protocol)
|
||||
protocol.toByteArray
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -184,5 +184,3 @@ trait StmUtil {
|
|||
}.execute()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ class SerializableActorSpec extends
|
|||
describe("SerializableActor") {
|
||||
it("should be able to serialize and deserialize a JavaSerializableActor") {
|
||||
val actor1 = actorOf[JavaSerializableTestActor].start
|
||||
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
||||
|
||||
val bytes = actor1.toBinary
|
||||
|
|
@ -30,9 +29,9 @@ class SerializableActorSpec extends
|
|||
(actor2 !! "hello").getOrElse("_") should equal("world 2")
|
||||
}
|
||||
|
||||
/*
|
||||
it("should be able to serialize and deserialize a ProtobufSerializableActor") {
|
||||
val actor1 = actorOf[ProtobufSerializableTestActor].start
|
||||
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 2")
|
||||
|
||||
|
|
@ -43,8 +42,6 @@ class SerializableActorSpec extends
|
|||
(actor2 !! "hello").getOrElse("_") should equal("world 3")
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
it("should be able to serialize and deserialize a JavaJSONSerializableActor") {
|
||||
val actor1 = actorOf[JavaJSONSerializableTestActor].start
|
||||
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
|
||||
|
|
|
|||
|
|
@ -47,12 +47,13 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers {
|
|||
bean.setTarget("java.lang.String")
|
||||
assert(bean.getObjectType == classOf[String])
|
||||
}
|
||||
it("should create a proxy of type ResourceEditor") {
|
||||
val bean = new ActiveObjectFactoryBean()
|
||||
// we must have a java class here
|
||||
bean.setTarget("org.springframework.core.io.ResourceEditor")
|
||||
val entries = new PropertyEntries()
|
||||
val entry = new PropertyEntry()
|
||||
|
||||
it("should create a proxy of type ResourceEditor") {
|
||||
val bean = new ActiveObjectFactoryBean()
|
||||
// we must have a java class here
|
||||
bean.setTarget("org.springframework.core.io.ResourceEditor")
|
||||
val entries = new PropertyEntries()
|
||||
val entry = new PropertyEntry()
|
||||
entry.name = "source"
|
||||
entry.value = "sourceBeanIsAString"
|
||||
entries.add(entry)
|
||||
|
|
@ -60,14 +61,16 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers {
|
|||
assert(bean.getObjectType == classOf[ResourceEditor])
|
||||
|
||||
// Check that we have injected the depencency correctly
|
||||
val target:ResourceEditor = bean.createInstance.asInstanceOf[ResourceEditor]
|
||||
assert(target.getSource === entry.value)
|
||||
}
|
||||
val target:ResourceEditor = bean.createInstance.asInstanceOf[ResourceEditor]
|
||||
assert(target.getSource === entry.value)
|
||||
}
|
||||
|
||||
/*
|
||||
it("should create an application context and inject a string dependency") {
|
||||
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
|
||||
val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor]
|
||||
assert(target.getSource === "someString")
|
||||
}
|
||||
|
||||
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
|
||||
val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor]
|
||||
assert(target.getSource === "someString")
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue