diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 7e038a7509..793c0c953d 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -12,7 +12,7 @@ import se.scalablesolutions.akka.stm.Transaction._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.{StmException, TransactionManagement} import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest -import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder,RemoteServer, RemoteClient, RemoteRequestIdFactory} +import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} @@ -72,18 +72,8 @@ object Actor extends Logging { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999) - object Sender extends Actor { + object Sender { implicit val Self: Option[Actor] = None - - def receive = { - case unknown => - Actor.log.error( - "Actor.Sender can't process messages. Received message [%s]." + - "This error could occur if you either:" + - "\n\t- Explicitly send a message to the Actor.Sender object." + - "\n\t- Invoking the 'reply(..)' method or sending a message to the 'sender' reference " + - "\n\t when you have sent the original request from a instance *not* being an actor.", unknown) - } } /** @@ -225,7 +215,7 @@ trait Actor extends TransactionManagement { private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _linkedActors: Option[HashSet[Actor]] = None private[akka] var _supervisor: Option[Actor] = None - private[akka] var _contactAddress: Option[InetSocketAddress] = None + private[akka] var _replyToAddress: Option[InetSocketAddress] = None private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation] // ==================================== @@ -596,7 +586,7 @@ trait Actor extends TransactionManagement { "\n\t\t2. Send a message from an instance that is *not* an actor" + "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + - "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setContactAddress to make sure the actor can be contacted over the network.") + "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") case Some(future) => future.completeWithResult(message) } @@ -635,17 +625,17 @@ trait Actor extends TransactionManagement { if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") else { _remoteAddress = Some(address) - if(_contactAddress.isEmpty) - setContactAddress(RemoteServer.HOSTNAME,RemoteServer.PORT) + if(_replyToAddress.isEmpty) + setReplyToAddress(Actor.HOSTNAME,Actor.PORT) } /** * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists. */ - def setContactAddress(hostname: String, port: Int): Unit = setContactAddress(new InetSocketAddress(hostname, port)) + def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port)) - def setContactAddress(address: InetSocketAddress): Unit = _contactAddress = Some(address) + def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address) /** * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. @@ -803,26 +793,24 @@ trait Actor extends TransactionManagement { .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) + val id = registerSupervisorAsRemoteActor - if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) + if(id.isDefined) + requestBuilder.setSupervisorUuid(id.get) // set the source fields used to reply back to the original sender // (i.e. not the remote proxy actor) - if (sender.isDefined) { - requestBuilder.setSourceTarget(sender.get.getClass.getName) - requestBuilder.setSourceUuid(sender.get.uuid) - log.debug("Setting sending actor as %s, %s", sender.get.getClass.getName, _contactAddress) + if(sender.isDefined) { + val s = sender.get + requestBuilder.setSourceTarget(s.getClass.getName) + requestBuilder.setSourceUuid(s.uuid) - if (sender.get._contactAddress.isDefined) { - val addr = sender.get._contactAddress.get - requestBuilder.setSourceHostname(addr.getHostName()) - requestBuilder.setSourcePort(addr.getPort()) - } else { - // set the contact address to the default values from the - // configuration file - requestBuilder.setSourceHostname(Actor.HOSTNAME) - requestBuilder.setSourcePort(Actor.PORT) - } + val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT)) + + log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) + + requestBuilder.setSourceHostname(host) + requestBuilder.setSourcePort(port) } RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None) @@ -831,7 +819,9 @@ trait Actor extends TransactionManagement { if (_isEventBased) { _mailbox.add(invocation) if (_isSuspended) invocation.send - } else invocation.send + } + else + invocation.send } } @@ -926,7 +916,7 @@ trait Actor extends TransactionManagement { if (senderFuture.isEmpty) throw new StmException( "Can't continue transaction in a one-way fire-forget message send" + "\n\tE.g. using Actor '!' method or Active Object 'void' method" + - "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type") + "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type") atomic { proceed } diff --git a/akka-core/src/test/scala/RemoteActorTest.scala b/akka-core/src/test/scala/RemoteActorTest.scala index 6bb5d8e689..f0e7016ded 100644 --- a/akka-core/src/test/scala/RemoteActorTest.scala +++ b/akka-core/src/test/scala/RemoteActorTest.scala @@ -108,7 +108,7 @@ class RemoteActorTest extends JUnitSuite { actor.start val sender = new RemoteActorSpecActorAsyncSender - sender.setContactAddress(HOSTNAME, PORT1) + sender.setReplyToAddress(HOSTNAME, PORT1) sender.start sender.send(actor) Thread.sleep(1000)