diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 2fd4a217db..b5313afb7c 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -435,7 +435,7 @@ trait Actor extends TransactionManagement { _isShutDown = true shutdown ActorRegistry.unregister(this) -// _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) + _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) } } @@ -483,8 +483,7 @@ trait Actor extends TransactionManagement { def send(message: Any) = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) postMessageToMailbox(message, None) - else throw new IllegalStateException( - "Actor has not been started, you need to invoke 'actor.start' before using it") + else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } /** @@ -784,7 +783,7 @@ trait Actor extends TransactionManagement { actor } - private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { + protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) @@ -826,7 +825,7 @@ trait Actor extends TransactionManagement { } } - private def postMessageToMailboxAndCreateFutureResultWithTimeout( + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 0a7bcfa2c8..9dba7b739a 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -24,7 +24,56 @@ import java.net.{SocketAddress, InetSocketAddress} import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap} import java.util.concurrent.atomic.AtomicLong -import scala. collection.mutable.{HashSet, HashMap} +import scala.collection.mutable.{HashSet, HashMap} + +/* +class RemoteActorHandle(id: String, className: String, timeout: Long, hostname: String, port: Int) extends Actor { + start + val remoteClient = RemoteClient.clientFor(hostname, port) + + override def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeout) + .setUuid(id) + .setIsActor(true) + .setIsOneWay(true) + .setIsEscaped(false) + if (sender.isDefined) { + val s = sender.get + requestBuilder.setSourceTarget(s.getClass.getName) + requestBuilder.setSourceUuid(s.uuid) + val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + requestBuilder.setSourceHostname(host) + requestBuilder.setSourcePort(port) + } + RemoteProtocolBuilder.setMessage(message, requestBuilder) + remoteClient.send(requestBuilder.build, None) + } + + override def postMessageToMailboxAndCreateFutureResultWithTimeout( + message: Any, + timeout: Long, + senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeout) + .setUuid(id) + .setIsActor(true) + .setIsOneWay(false) + .setIsEscaped(false) + RemoteProtocolBuilder.setMessage(message, requestBuilder) + val future = remoteClient.send(requestBuilder.build, senderFuture) + if (future.isDefined) future.get + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + } + + def receive = { case _ => {} } +} + +*/ /** * @author Jonas Bonér @@ -32,6 +81,7 @@ import scala. collection.mutable.{HashSet, HashMap} object RemoteRequestIdFactory { private val nodeId = UUID.newUuid private val id = new AtomicLong + def nextId: Long = id.getAndIncrement + nodeId } @@ -45,6 +95,65 @@ object RemoteClient extends Logging { private val remoteClients = new HashMap[String, RemoteClient] private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] + // FIXME: simplify overloaded methods when we have Scala 2.8 +/* + def actorFor(className: String, hostname: String, port: Int): Actor = + actorFor(className, className, 5000, hostname, port) + + def actorFor(actorId: String, className: String, hostname: String, port: Int): Actor = + actorFor(actorId, className, 5000, hostname, port) +*/ + def actorFor(className: String, timeout: Long, hostname: String, port: Int): Actor = + actorFor(className, className, timeout, hostname, port) + + def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): Actor = { + new Actor { + start + val remoteClient = RemoteClient.clientFor(hostname, port) + + override def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeout) + .setUuid(actorId) + .setIsActor(true) + .setIsOneWay(true) + .setIsEscaped(false) + if (sender.isDefined) { + val s = sender.get + requestBuilder.setSourceTarget(s.getClass.getName) + requestBuilder.setSourceUuid(s.uuid) + val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + requestBuilder.setSourceHostname(host) + requestBuilder.setSourcePort(port) + } + RemoteProtocolBuilder.setMessage(message, requestBuilder) + remoteClient.send(requestBuilder.build, None) + } + + override def postMessageToMailboxAndCreateFutureResultWithTimeout( + message: Any, + timeout: Long, + senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeout) + .setUuid(actorId) + .setIsActor(true) + .setIsOneWay(false) + .setIsEscaped(false) + RemoteProtocolBuilder.setMessage(message, requestBuilder) + val future = remoteClient.send(requestBuilder.build, senderFuture) + if (future.isDefined) future.get + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + } + + def receive = {case _ => {}} + } + } + def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) def clientFor(address: InetSocketAddress): RemoteClient = synchronized { @@ -155,7 +264,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { } else { futures.synchronized { val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFutureResult(request.getTimeout) + else new DefaultCompletableFutureResult(request.getTimeout) futures.put(request.getId, futureResult) connection.getChannel.write(request) Some(futureResult) @@ -185,21 +294,21 @@ class RemoteClientPipelineFactory(name: String, timer: HashedWheelTimer, client: RemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { - val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) - val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance) - val protobufEnc = new ProtobufEncoder + val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) + val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance) + val protobufEnc = new ProtobufEncoder val zipCodec = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) + case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) //case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder)) case _ => None } val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) val stages: Array[ChannelHandler] = - zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient)) - .getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)) + zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient)) + .getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)) new StaticChannelPipeline(stages: _*) } } @@ -207,7 +316,7 @@ class RemoteClientPipelineFactory(name: String, /** * @author Jonas Bonér */ -@ChannelPipelineCoverage { val value = "all" } +@ChannelPipelineCoverage {val value = "all"} class RemoteClientHandler(val name: String, val futures: ConcurrentMap[Long, CompletableFutureResult], val supervisors: ConcurrentMap[String, Actor], @@ -215,7 +324,7 @@ class RemoteClientHandler(val name: String, val remoteAddress: SocketAddress, val timer: HashedWheelTimer, val client: RemoteClient) - extends SimpleChannelUpstreamHandler with Logging { + extends SimpleChannelUpstreamHandler with Logging { import Actor.Sender.Self override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { @@ -284,7 +393,7 @@ class RemoteClientHandler(val name: String, val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$'))) val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length) exceptionType - .getConstructor(Array[Class[_]](classOf[String]): _*) - .newInstance(exceptionMessage).asInstanceOf[Throwable] + .getConstructor(Array[Class[_]](classOf[String]): _*) + .newInstance(exceptionMessage).asInstanceOf[Throwable] } } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 3d77046df6..b112a1bddc 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -182,13 +182,19 @@ class RemoteServer extends Logging { } } - def shutdown = { + def shutdown = if (isRunning) { RemoteServer.unregister(hostname, port) openChannels.disconnect openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) } + + // TODO: register active object in RemoteServer as well + def register(actor: Actor) = if (isRunning) { + log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id) + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) + } } case class Codec(encoder : ChannelHandler, decoder : ChannelHandler) @@ -256,8 +262,7 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { val message = event.getMessage - if (message eq null) throw new IllegalStateException( - "Message in remote MessageEvent is null: " + event) + if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event) if (message.isInstanceOf[RemoteRequest]) { handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel) } diff --git a/akka-core/src/test/scala/AllTest.scala b/akka-core/src/test/scala/AllTest.scala index 37604e2e7a..fdf3351298 100644 --- a/akka-core/src/test/scala/AllTest.scala +++ b/akka-core/src/test/scala/AllTest.scala @@ -4,7 +4,7 @@ import junit.framework.Test import junit.framework.TestCase import junit.framework.TestSuite -import se.scalablesolutions.akka.actor.{RemoteActorTest, InMemoryActorTest, ThreadBasedActorTest, SupervisorTest, RemoteSupervisorTest, SchedulerTest} +import se.scalablesolutions.akka.actor.{ClientInitiatedRemoteActorTest, InMemoryActorTest, ThreadBasedActorTest, SupervisorTest, RemoteSupervisorTest, SchedulerTest} object AllTest extends TestCase { def suite(): Test = { @@ -16,7 +16,7 @@ object AllTest extends TestCase { suite.addTestSuite(classOf[ThreadBasedActorTest]) suite.addTestSuite(classOf[ReactorBasedSingleThreadEventDrivenDispatcherTest]) suite.addTestSuite(classOf[ReactorBasedThreadPoolEventDrivenDispatcherTest]) - suite.addTestSuite(classOf[RemoteActorTest]) + suite.addTestSuite(classOf[ClientInitiatedRemoteActorTest]) suite.addTestSuite(classOf[InMemoryActorTest]) suite.addTestSuite(classOf[SchedulerTest]) //suite.addTestSuite(classOf[TransactionClasherTest]) diff --git a/akka-core/src/test/scala/RemoteActorTest.scala b/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala similarity index 63% rename from akka-core/src/test/scala/RemoteActorTest.scala rename to akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala index f0e7016ded..7de701d4f0 100644 --- a/akka-core/src/test/scala/RemoteActorTest.scala +++ b/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala @@ -9,46 +9,47 @@ import org.junit.{Test, Before, After} import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import se.scalablesolutions.akka.dispatch.Dispatchers -object Global { - var oneWay = "nada" - var remoteReply = "nada" -} -class RemoteActorSpecActorUnidirectional extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) +object ClientInitiatedRemoteActorTest { + object Global { + var oneWay = "nada" + var remoteReply = "nada" + } + case class Send(actor: Actor) - def receive = { - case "OneWay" => - Global.oneWay = "received" + class RemoteActorSpecActorUnidirectional extends Actor { + dispatcher = Dispatchers.newThreadBasedDispatcher(this) + + def receive = { + case "OneWay" => + Global.oneWay = "received" + } + } + + class RemoteActorSpecActorBidirectional extends Actor { + def receive = { + case "Hello" => + reply("World") + case "Failure" => + throw new RuntimeException("expected") + } + } + + class RemoteActorSpecActorAsyncSender extends Actor { + def receive = { + case Send(actor: Actor) => + actor ! "Hello" + case "World" => + Global.remoteReply = "replied" + } + + def send(actor: Actor) { + this ! Send(actor) + } } } -class RemoteActorSpecActorBidirectional extends Actor { - def receive = { - case "Hello" => - reply("World") - case "Failure" => - throw new RuntimeException("expected") - } -} - -case class Send(actor: Actor) - -class RemoteActorSpecActorAsyncSender extends Actor { - def receive = { - case Send(actor: Actor) => - actor ! "Hello" - case "World" => - Global.remoteReply = "replied" - } - - def send(actor: Actor) { - this ! Send(actor) - } -} - -class RemoteActorTest extends JUnitSuite { - import Actor.Sender.Self - +class ClientInitiatedRemoteActorTest extends JUnitSuite { + import ClientInitiatedRemoteActorTest._ akka.Config.config val HOSTNAME = "localhost" @@ -57,6 +58,8 @@ class RemoteActorTest extends JUnitSuite { var s1: RemoteServer = null var s2: RemoteServer = null + import Actor.Sender.Self + @Before def init() { s1 = new RemoteServer() @@ -116,26 +119,7 @@ class RemoteActorTest extends JUnitSuite { actor.stop } - /* - This test does not throw an exception since the - _contactAddress is always defined via the - global configuration if not set explicitly. - - @Test - def shouldSendRemoteReplyException = { - implicit val timeout = 500000000L - val actor = new RemoteActorSpecActorBidirectional - actor.makeRemote(HOSTNAME, PORT1) - actor.start - - val sender = new RemoteActorSpecActorAsyncSender - sender.start - sender.send(actor) - Thread.sleep(500) - assert("exception" === Global.remoteReply) - actor.stop - } - */ + @Test def shouldSendReceiveException = { implicit val timeout = 500000000L diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala new file mode 100644 index 0000000000..232e8e539b --- /dev/null +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala @@ -0,0 +1,144 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.TimeUnit + +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} + +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} +import se.scalablesolutions.akka.dispatch.Dispatchers + +object ServerInitiatedRemoteActorTest { + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null + + object Global { + var oneWay = "nada" + var remoteReply = "nada" + } + + class RemoteActorSpecActorUnidirectional extends Actor { + dispatcher = Dispatchers.newThreadBasedDispatcher(this) + start + + def receive = { + case "OneWay" => + println("================== ONEWAY") + Global.oneWay = "received" + } + } + + class RemoteActorSpecActorBidirectional extends Actor { + start + def receive = { + case "Hello" => + reply("World") + case "Failure" => + throw new RuntimeException("expected") + } + } + + case class Send(actor: Actor) + + class RemoteActorSpecActorAsyncSender extends Actor { + start + def receive = { + case Send(actor: Actor) => + actor ! "Hello" + case "World" => + Global.remoteReply = "replied" + } + + def send(actor: Actor) { + this ! Send(actor) + } + } +} + +class ServerInitiatedRemoteActorTest extends JUnitSuite { + import ServerInitiatedRemoteActorTest._ + + import Actor.Sender.Self + akka.Config.config + + private val unit = TimeUnit.MILLISECONDS + + @Before + def init() { + server = new RemoteServer() + + server.start(HOSTNAME, PORT) + + server.register(new RemoteActorSpecActorUnidirectional) + server.register(new RemoteActorSpecActorBidirectional) + server.register(new RemoteActorSpecActorAsyncSender) + + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + @After + def finished() { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } + + @Test + def shouldSendOneWay = { + val actor = RemoteClient.actorFor( + "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorUnidirectional", + 5000L, + HOSTNAME, PORT) + val result = actor ! "OneWay" + Thread.sleep(1000) + assert("received" === Global.oneWay) + actor.stop + } + + @Test + def shouldSendReplyAsync = { + val actor = RemoteClient.actorFor( + "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional", + 5000L, + HOSTNAME, PORT) + val result = actor !! "Hello" + assert("World" === result.get.asInstanceOf[String]) + actor.stop + } + + @Test + def shouldSendRemoteReply = { + implicit val timeout = 500000000L + val actor = RemoteClient.actorFor( + "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional", + timeout, + HOSTNAME, PORT) + + val sender = new RemoteActorSpecActorAsyncSender + sender.setReplyToAddress(HOSTNAME, PORT) + sender.start + sender.send(actor) + Thread.sleep(1000) + assert("replied" === Global.remoteReply) + actor.stop + } + + @Test + def shouldSendReceiveException = { + implicit val timeout = 500000000L + val actor = RemoteClient.actorFor( + "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional", + timeout, + HOSTNAME, PORT) + try { + actor !! "Failure" + fail("Should have thrown an exception") + } catch { + case e => + assert("expected" === e.getMessage()) + } + actor.stop + } +}