Now forward works with !! + added possibility to set a ClassLoader for the Serializer.* classes

This commit is contained in:
Jonas Bonér 2009-12-27 22:56:55 +01:00
parent 206c6eee53
commit 56d6c0d198
11 changed files with 110 additions and 53 deletions

View file

@ -84,6 +84,11 @@
<artifactId>dispatch-json_2.7.7</artifactId>
<version>0.6.4</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>net.databinder</groupId>
<artifactId>dispatch-http_2.7.7</artifactId>

View file

@ -287,7 +287,7 @@ private[akka] sealed class ActiveObjectAspect {
val id = actor.registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val remoteMessage = requestBuilder.build
val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage)
val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None)
if (oneWay_?) null // for void methods
else {
if (future.isDefined) {

View file

@ -429,8 +429,7 @@ trait Actor extends TransactionManagement {
messageDispatcher.register(this)
messageDispatcher.start
_isRunning = true
init // call user-defined init method
//if (isTransactional) this !! TransactionalInit
init
}
Actor.log.debug("[%s] has started", toString)
this
@ -520,7 +519,7 @@ trait Actor extends TransactionManagement {
if (_isRunning) {
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
else None
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None)
val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
try {
@ -544,6 +543,7 @@ trait Actor extends TransactionManagement {
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
@ -570,13 +570,16 @@ trait Actor extends TransactionManagement {
/**
* Forwards the message and passes the original sender actor as the sender.
* <p/>
* Works with both '!' and '!!'.
*/
def forward(message: Any)(implicit sender: Option[Actor]) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
val forwarder = sender.getOrElse(throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor"))
if (forwarder.getSender.isEmpty) throw new IllegalStateException("Can't forward message when initial sender is not an actor")
postMessageToMailbox(message, forwarder.getSender)
if (forwarder.getSenderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, forwarder.getSenderFuture)
else if (forwarder.getSender.isDefined) postMessageToMailbox(message, forwarder.getSender)
else throw new IllegalStateException("Can't forward message when initial sender is not an actor")
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
@ -789,6 +792,8 @@ trait Actor extends TransactionManagement {
private[akka] def getSender = sender
private[akka] def getSenderFuture = senderFuture
private def spawnButDoNotStart[T <: Actor](actorClass: Class[T]): T = {
val actor = actorClass.newInstance.asInstanceOf[T]
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
@ -829,7 +834,7 @@ trait Actor extends TransactionManagement {
}
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
} else {
val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
if (_isEventBased) {
@ -840,7 +845,9 @@ trait Actor extends TransactionManagement {
}
private def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any, timeout: Long): CompletableFutureResult = {
message: Any,
timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@ -853,11 +860,12 @@ trait Actor extends TransactionManagement {
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = new DefaultCompletableFutureResult(timeout)
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFutureResult(timeout)
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
if (_isEventBased) {
_mailbox.add(invocation)

View file

@ -6,13 +6,14 @@ package se.scalablesolutions.akka.actor
import java.io.File
import java.net.URLClassLoader
import se.scalablesolutions.akka.util.{Bootable,Logging}
import se.scalablesolutions.akka.Config._
/**
* Handles all modules in the deploy directory (load and unload)
*/
trait BootableActorLoaderService extends Bootable with Logging {
import Config._
val BOOT_CLASSES = config.getList("akka.boot")
var applicationLoader: Option[ClassLoader] = None
@ -38,7 +39,6 @@ trait BootableActorLoaderService extends Bootable with Logging {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
}
Some(loader)
}

View file

@ -6,6 +6,7 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.util.{Bootable,Logging}
import se.scalablesolutions.akka.Config.config
/**
* This bundle/service is responsible for booting up and shutting down the remote actors facility
@ -15,8 +16,6 @@ import se.scalablesolutions.akka.util.{Bootable,Logging}
trait BootableRemoteActorService extends Bootable with Logging {
self : BootableActorLoaderService =>
import Config._
protected lazy val remoteServerThread = new Thread(new Runnable() {
def run = RemoteNode.start(self.applicationLoader)
}, "Akka Remote Service")

View file

@ -27,6 +27,9 @@ import java.util.concurrent.atomic.AtomicLong
import org.codehaus.aspectwerkz.proxy.Uuid
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteRequestIdFactory {
private val nodeId = Uuid.newUuid
private val id = new AtomicLong
@ -55,12 +58,12 @@ object RemoteClient extends Logging {
}
}
/*
* Clean-up all open connections
*/
/**
* Clean-up all open connections.
*/
def shutdownAll() = synchronized {
clients.foreach({case (addr, client) => client.shutdown})
clients.clear
clients.foreach({case (addr, client) => client.shutdown})
clients.clear
}
}
@ -107,16 +110,17 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
connection.getChannel.getCloseFuture.awaitUninterruptibly
channelFactory.releaseExternalResources
}
timer.stop
timer.stop
}
def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) {
def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
} else {
futures.synchronized {
val futureResult = new DefaultCompletableFutureResult(request.getTimeout)
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFutureResult(request.getTimeout)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@ -142,7 +146,7 @@ class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
timer: HashedWheelTimer) extends ChannelPipelineFactory {
timer: HashedWheelTimer) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT))
@ -173,7 +177,7 @@ class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val timer: HashedWheelTimer)
val timer: HashedWheelTimer)
extends SimpleChannelUpstreamHandler with Logging {
import Actor.Sender.Self

View file

@ -11,22 +11,38 @@ import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest,
import com.google.protobuf.{Message, ByteString}
object RemoteProtocolBuilder {
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(classLoader: ClassLoader) = {
SERIALIZER_JAVA = new Serializer.Java
SERIALIZER_JAVA_JSON = new Serializer.JavaJSON
SERIALIZER_SCALA_JSON = new Serializer.ScalaJSON
SERIALIZER_JAVA.setClassLoader(classLoader)
SERIALIZER_JAVA_JSON.setClassLoader(classLoader)
SERIALIZER_SCALA_JSON.setClassLoader(classLoader)
}
def getMessage(request: RemoteRequest): Any = {
request.getProtocol match {
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
Serializer.ScalaJSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.JAVA_JSON =>
val manifest = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
Serializer.JavaJSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.PROTOBUF =>
val messageClass = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
Serializer.Protobuf.in(request.getMessage.toByteArray, Some(messageClass))
val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass))
case SerializationProtocol.JAVA =>
unbox(Serializer.Java.in(request.getMessage.toByteArray, None))
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
}
@ -38,16 +54,16 @@ object RemoteProtocolBuilder {
val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
Serializer.ScalaJSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.JAVA_JSON =>
val manifest = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
Serializer.JavaJSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.PROTOBUF =>
val messageClass = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
Serializer.Protobuf.in(reply.getMessage.toByteArray, Some(messageClass))
val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass))
case SerializationProtocol.JAVA =>
unbox(Serializer.Java.in(reply.getMessage.toByteArray, None))
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
}
@ -63,7 +79,7 @@ object RemoteProtocolBuilder {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(Serializer.Java.out(serializable.getClass)))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setProtocol(SerializationProtocol.SCALA_JSON)
@ -77,7 +93,7 @@ object RemoteProtocolBuilder {
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setProtocol(SerializationProtocol.JAVA)
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message))))
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.out(box(message))))
}
}
@ -91,7 +107,7 @@ object RemoteProtocolBuilder {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(Serializer.Java.out(serializable.getClass)))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setProtocol(SerializationProtocol.SCALA_JSON)
@ -105,7 +121,7 @@ object RemoteProtocolBuilder {
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setProtocol(SerializationProtocol.JAVA)
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message))))
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.out(box(message))))
}
}

View file

@ -117,7 +117,7 @@ class RemoteServer extends Logging {
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
isRunning = true
Cluster.registerLocalNode(hostname,port)
Cluster.registerLocalNode(hostname, port)
}
} catch {
case e => log.error(e, "Could not start up remote server")
@ -127,15 +127,17 @@ class RemoteServer extends Logging {
def shutdown = {
openChannels.close.awaitUninterruptibly()
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname,port)
Cluster.deregisterLocalNode(hostname, port)
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, loader: Option[ClassLoader])
extends ChannelPipelineFactory {
class RemoteServerPipelineFactory(
name: String,
openChannels: ChannelGroup,
loader: Option[ClassLoader]) extends ChannelPipelineFactory {
import RemoteServer._
def getPipeline: ChannelPipeline = {
@ -163,13 +165,17 @@ class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, load
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ChannelPipelineCoverage {val value = "all"}
class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader])
extends SimpleChannelUpstreamHandler with Logging {
class RemoteServerHandler(
val name: String,
openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_))
/**
* ChannelOpen overridden to store open channels for a clean shutdown
* of a RemoteServer. If a channel is closed before, it is

View file

@ -6,6 +6,8 @@ package se.scalablesolutions.akka.serialization
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
import org.apache.commons.io.input.ClassLoaderObjectInputStream
import com.google.protobuf.Message
import org.codehaus.jackson.map.ObjectMapper
@ -50,6 +52,10 @@ object Serializer {
*/
object Java extends Java
class Java extends Serializer {
private var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = {
@ -61,7 +67,8 @@ object Serializer {
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = in.readObject
in.close
obj
@ -100,6 +107,10 @@ object Serializer {
class JavaJSON extends Serializer {
private val mapper = new ObjectMapper
private var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
def out(obj: AnyRef): Array[Byte] = {
@ -112,7 +123,8 @@ object Serializer {
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided")
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef]
in.close
obj
@ -131,8 +143,13 @@ object Serializer {
class ScalaJSON extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
private var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
// FIXME set ClassLoader on SJSONSerializer.SJSON
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = SJSONSerializer.SJSON.in(bytes)
def in(json: String): AnyRef = SJSONSerializer.SJSON.in(json)

View file

@ -21,7 +21,8 @@
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.java.Boot",
"sample.scala.Boot",
"se.scalablesolutions.akka.security.samples.Boot"]
"se.scalablesolutions.akka.security.samples.Boot",
"se.scalablesolutions.akka.sample.chat.Boot"]
<actor>
timeout = 5000 # default timeout for future based invocations
@ -49,7 +50,7 @@
<cluster>
name = "default" # The name of the cluster
actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor
serializer = "se.scalablesolutions.akka.serialization.Serializer.Java" # FQN of the serializer class
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class
</cluster>
<server>

View file

@ -62,6 +62,7 @@
<module>akka-fun-test-java</module>
<module>akka-samples-scala</module>
<module>akka-samples-lift</module>
<module>akka-samples-chat</module>
<module>akka-samples-java</module>
<module>akka-samples-security</module>
</modules>