Minor code refresh

This commit is contained in:
Viktor Klang 2010-01-18 22:33:29 +01:00
parent 8093c1bfb1
commit c1fe41f8ff
2 changed files with 26 additions and 36 deletions

View file

@ -12,7 +12,7 @@ import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder,RemoteServer, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
@ -72,18 +72,8 @@ object Actor extends Logging {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
object Sender extends Actor {
object Sender {
implicit val Self: Option[Actor] = None
def receive = {
case unknown =>
Actor.log.error(
"Actor.Sender can't process messages. Received message [%s]." +
"This error could occur if you either:" +
"\n\t- Explicitly send a message to the Actor.Sender object." +
"\n\t- Invoking the 'reply(..)' method or sending a message to the 'sender' reference " +
"\n\t when you have sent the original request from a instance *not* being an actor.", unknown)
}
}
/**
@ -225,7 +215,7 @@ trait Actor extends TransactionManagement {
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
private[akka] var _contactAddress: Option[InetSocketAddress] = None
private[akka] var _replyToAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
// ====================================
@ -596,7 +586,7 @@ trait Actor extends TransactionManagement {
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setContactAddress to make sure the actor can be contacted over the network.")
"\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
case Some(future) =>
future.completeWithResult(message)
}
@ -635,17 +625,17 @@ trait Actor extends TransactionManagement {
if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else {
_remoteAddress = Some(address)
if(_contactAddress.isEmpty)
setContactAddress(RemoteServer.HOSTNAME,RemoteServer.PORT)
if(_replyToAddress.isEmpty)
setReplyToAddress(Actor.HOSTNAME,Actor.PORT)
}
/**
* Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
*/
def setContactAddress(hostname: String, port: Int): Unit = setContactAddress(new InetSocketAddress(hostname, port))
def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port))
def setContactAddress(address: InetSocketAddress): Unit = _contactAddress = Some(address)
def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address)
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
@ -803,26 +793,24 @@ trait Actor extends TransactionManagement {
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
if(id.isDefined)
requestBuilder.setSupervisorUuid(id.get)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
if (sender.isDefined) {
requestBuilder.setSourceTarget(sender.get.getClass.getName)
requestBuilder.setSourceUuid(sender.get.uuid)
log.debug("Setting sending actor as %s, %s", sender.get.getClass.getName, _contactAddress)
if(sender.isDefined) {
val s = sender.get
requestBuilder.setSourceTarget(s.getClass.getName)
requestBuilder.setSourceUuid(s.uuid)
if (sender.get._contactAddress.isDefined) {
val addr = sender.get._contactAddress.get
requestBuilder.setSourceHostname(addr.getHostName())
requestBuilder.setSourcePort(addr.getPort())
} else {
// set the contact address to the default values from the
// configuration file
requestBuilder.setSourceHostname(Actor.HOSTNAME)
requestBuilder.setSourcePort(Actor.PORT)
}
val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
@ -831,7 +819,9 @@ trait Actor extends TransactionManagement {
if (_isEventBased) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
} else invocation.send
}
else
invocation.send
}
}
@ -926,7 +916,7 @@ trait Actor extends TransactionManagement {
if (senderFuture.isEmpty) throw new StmException(
"Can't continue transaction in a one-way fire-forget message send" +
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
"\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type")
"\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
atomic {
proceed
}

View file

@ -108,7 +108,7 @@ class RemoteActorTest extends JUnitSuite {
actor.start
val sender = new RemoteActorSpecActorAsyncSender
sender.setContactAddress(HOSTNAME, PORT1)
sender.setReplyToAddress(HOSTNAME, PORT1)
sender.start
sender.send(actor)
Thread.sleep(1000)