Fixed bug in remote actors + improved scaladoc

This commit is contained in:
Jonas Bonér 2010-06-04 15:58:26 +02:00
parent 02af674a96
commit 43aecb6e42
9 changed files with 21 additions and 122 deletions

View file

@ -300,22 +300,6 @@ trait ActorRef extends TransactionManagement {
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Sends a message asynchronously and waits on a future for a reply message.
* Uses the time-out defined in the Actor.
* <p/>
* It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
// def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout)
/**
* Sends a message asynchronously returns a future holding the eventual reply message.
* <p/>
@ -349,13 +333,14 @@ trait ActorRef extends TransactionManagement {
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
"\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object.")
"\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object." +
"\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently

View file

@ -40,7 +40,7 @@ import se.scalablesolutions.akka.config.Config.config
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers {
val THROUGHPUT = config.getInt("akka.dispatcher.throughput", 5)
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor: ActorRef) = {

View file

@ -155,7 +155,7 @@ object RemoteClient extends Logging {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging {
class RemoteClient private[akka] (val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
@ -203,6 +203,10 @@ class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoad
}
}
def registerListener(actorRef: ActorRef) = listeners.add(actorRef)
def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef)
def send[T](request: RemoteRequestProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
@ -222,17 +226,13 @@ class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoad
throw exception
}
def registerSupervisorForActor(actorRef: ActorRef) =
private[akka] def registerSupervisorForActor(actorRef: ActorRef) =
if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorRef + " since it is not under supervision")
else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef)
def deregisterSupervisorForActor(actorRef: ActorRef) =
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef) =
if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision")
else supervisors.remove(actorRef.supervisor.get.uuid)
def registerListener(actorRef: ActorRef) = listeners.add(actorRef)
def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef)
}
/**

View file

@ -97,6 +97,7 @@ object RemoteProtocolBuilder {
}
def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = {
(new Exception).printStackTrace
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)

View file

@ -364,12 +364,12 @@ class RemoteServerHandler(
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.hasSender) {
val sender = request.getSender
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtobuf(sender, applicationLoader)))
} else {
val sender = if (request.hasSender) Some(ActorRef.fromProtobuf(request.getSender, applicationLoader))
else None
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
try {
val resultOrNone = actorRef !! message
val resultOrNone = actorRef.!!(message)(sender)
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder

View file

@ -164,7 +164,7 @@ object Transaction {
}
/**
* See ScalaDoc on Transaction.Local class.
* See ScalaDoc on Transaction.Local class.
*/
def atomically[A](firstBody: => A) = elseBody(firstBody)

View file

@ -25,13 +25,10 @@
"sample.security.Boot"]
<actor>
timeout = 5000 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
</actor>
<dispatcher>
timeout = 5000 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher
</dispatcher>
</actor>
<stm>
service = on

File diff suppressed because one or more lines are too long

View file

@ -1,65 +0,0 @@
<html>
<head>
<title>Push Services Demo</title>
<script language="javascript" src="jquery-1.3.2.min.js">
</script>
<style>
.hide {
visibility: hidden;
}
</style>
</head>
<body id="body">
<div id="container">
<div id="chat">
<div id="chatwindow">
</div>
<div>
<input type="text" name="name" id="name"/>
<input type="button" name="login" value="Login" id="login"/>
<textarea id="msg" class="hide" name="message" rows="10" cols="150"></textarea>
<input type="button" name="Send" id="send" value="Send" class="hide"/>
</div>
</div>
</div>
<script language="javascript">
$(function() {
window.app = {
update : function(data) {
if (data && data.name)
$('#chatwindow').append('<p>' + data.name + (data.message ? (': ' + data.message) : '') + '</p>');
else
alert(data);
}
};
$('#send').click(function(e) {
var message = $('#msg').val();
$('#msg').val('');
$.post('/chat',
{
'action' : 'post',
'name' : $('#name').val(),
'message' : message
});
});
$('#login').click(function(e) {
$.post('/chat',
{
'action' : 'login',
'name' : $('#name').val()
},
function(data) {
$('#login').hide();
$('#name').attr('disabled', 'disabled');
$('#msg').removeClass('hide');
$('#send').removeClass('hide');
$('<iframe style="display:hidden;" id="comet" src="/chat"></iframe>').appendTo('#body');
});
});
});
</script>
</body>
</html>