Merge branch 'master' of git://github.com/jboner/akka into session-actors

This commit is contained in:
Paul Pacheco 2010-11-14 13:10:37 -06:00
commit 26f23ac5ec
179 changed files with 3961 additions and 4471 deletions

View file

@ -16,7 +16,6 @@ import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.Config._
import akka.config.ConfigurationException
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
@ -31,6 +30,7 @@ import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.Map
import scala.reflect.BeanProperty
import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture}
/**
* Use this object if you need a single remote server on a specific node.
@ -66,10 +66,10 @@ object RemoteNode extends RemoteServer
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer {
val UUID_PREFIX = "uuid:"
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie")
val REQUIRE_COOKIE = {
val UUID_PREFIX = "uuid:"
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576)
val SECURE_COOKIE = config.getString("akka.remote.secure-cookie")
val REQUIRE_COOKIE = {
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
if (requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
@ -243,7 +243,6 @@ class RemoteServer extends Logging with ListenerManagement {
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
_isRunning = true
Cluster.registerLocalNode(hostname, port)
notifyListeners(RemoteServerStarted(this))
}
} catch {
@ -261,7 +260,6 @@ class RemoteServer extends Logging with ListenerManagement {
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port)
notifyListeners(RemoteServerShutdown(this))
} catch {
case e: java.nio.channels.ClosedChannelException => {}
@ -431,7 +429,7 @@ class RemoteServerPipelineFactory(
}
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteServer.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
@ -530,7 +528,7 @@ class RemoteServerHandler(
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
log.debug("Received RemoteMessageProtocol[\n%s]", request.toString)
log.debug("Received RemoteMessageProtocol[\n%s]".format(request.toString))
request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
@ -571,41 +569,46 @@ class RemoteServerHandler(
message,
request.getActorInfo.getTimeout,
None,
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
override def onComplete(result: AnyRef) {
log.debug("Returning result from actor invocation [%s]", result)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Left(result),
true,
Some(actorRef),
None,
AkkaActorType.ScalaActor,
None)
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout).
onComplete(f => {
val result = f.result
val exception = f.exception
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
if (exception.isDefined) {
log.debug("Returning exception from actor invocation [%s]".format(exception.get))
try {
channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
}
else if (result.isDefined) {
log.debug("Returning result from actor invocation [%s]".format(result.get))
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Left(result.get),
true,
Some(actorRef),
None,
AkkaActorType.ScalaActor,
None)
try {
channel.write(messageBuilder.build)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
try {
channel.write(messageBuilder.build)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
}
}
override def onCompleteException(exception: Throwable) {
try {
channel.write(createErrorReplyMessage(exception, request, AkkaActorType.ScalaActor))
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
}
}
))
)
))
}
}
@ -622,7 +625,10 @@ class RemoteServerHandler(
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*)
else {
val result = messageReceiver.invoke(typedActor, args: _*)
val result = messageReceiver.invoke(typedActor, args: _*) match {
case f: Future[_] => f.await.result.get //TODO replace this with a Listener on the Future to avoid blocking
case other => other
}
log.debug("Returning result from remote typed actor invocation [%s]", result)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(