Added support for server-initiated remote actors with clients getting a dummy handle to the remote actor

This commit is contained in:
Jonas Bonér 2010-02-16 15:39:09 +01:00
parent 8fb281f4b0
commit 41766bef22
6 changed files with 321 additions and 80 deletions

View file

@ -435,7 +435,7 @@ trait Actor extends TransactionManagement {
_isShutDown = true _isShutDown = true
shutdown shutdown
ActorRegistry.unregister(this) ActorRegistry.unregister(this)
// _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
} }
} }
@ -483,8 +483,7 @@ trait Actor extends TransactionManagement {
def send(message: Any) = { def send(message: Any) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) postMessageToMailbox(message, None) if (_isRunning) postMessageToMailbox(message, None)
else throw new IllegalStateException( else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
"Actor has not been started, you need to invoke 'actor.start' before using it")
} }
/** /**
@ -784,7 +783,7 @@ trait Actor extends TransactionManagement {
actor actor
} }
private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
if (_remoteAddress.isDefined) { if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId) .setId(RemoteRequestIdFactory.nextId)
@ -826,7 +825,7 @@ trait Actor extends TransactionManagement {
} }
} }
private def postMessageToMailboxAndCreateFutureResultWithTimeout( protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any, message: Any,
timeout: Long, timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {

View file

@ -24,7 +24,56 @@ import java.net.{SocketAddress, InetSocketAddress}
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap} import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala. collection.mutable.{HashSet, HashMap} import scala.collection.mutable.{HashSet, HashMap}
/*
class RemoteActorHandle(id: String, className: String, timeout: Long, hostname: String, port: Int) extends Actor {
start
val remoteClient = RemoteClient.clientFor(hostname, port)
override def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(id)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
if (sender.isDefined) {
val s = sender.get
requestBuilder.setSourceTarget(s.getClass.getName)
requestBuilder.setSourceUuid(s.uuid)
val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send(requestBuilder.build, None)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(id)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val future = remoteClient.send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
}
def receive = { case _ => {} }
}
*/
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -32,6 +81,7 @@ import scala. collection.mutable.{HashSet, HashMap}
object RemoteRequestIdFactory { object RemoteRequestIdFactory {
private val nodeId = UUID.newUuid private val nodeId = UUID.newUuid
private val id = new AtomicLong private val id = new AtomicLong
def nextId: Long = id.getAndIncrement + nodeId def nextId: Long = id.getAndIncrement + nodeId
} }
@ -45,6 +95,65 @@ object RemoteClient extends Logging {
private val remoteClients = new HashMap[String, RemoteClient] private val remoteClients = new HashMap[String, RemoteClient]
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
// FIXME: simplify overloaded methods when we have Scala 2.8
/*
def actorFor(className: String, hostname: String, port: Int): Actor =
actorFor(className, className, 5000, hostname, port)
def actorFor(actorId: String, className: String, hostname: String, port: Int): Actor =
actorFor(actorId, className, 5000, hostname, port)
*/
def actorFor(className: String, timeout: Long, hostname: String, port: Int): Actor =
actorFor(className, className, timeout, hostname, port)
def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): Actor = {
new Actor {
start
val remoteClient = RemoteClient.clientFor(hostname, port)
override def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(actorId)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
if (sender.isDefined) {
val s = sender.get
requestBuilder.setSourceTarget(s.getClass.getName)
requestBuilder.setSourceUuid(s.uuid)
val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send(requestBuilder.build, None)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(actorId)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val future = remoteClient.send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
}
def receive = {case _ => {}}
}
}
def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
def clientFor(address: InetSocketAddress): RemoteClient = synchronized { def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
@ -155,7 +264,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
} else { } else {
futures.synchronized { futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFutureResult(request.getTimeout) else new DefaultCompletableFutureResult(request.getTimeout)
futures.put(request.getId, futureResult) futures.put(request.getId, futureResult)
connection.getChannel.write(request) connection.getChannel.write(request)
Some(futureResult) Some(futureResult)
@ -185,21 +294,21 @@ class RemoteClientPipelineFactory(name: String,
timer: HashedWheelTimer, timer: HashedWheelTimer,
client: RemoteClient) extends ChannelPipelineFactory { client: RemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = { def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4) val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance) val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance)
val protobufEnc = new ProtobufEncoder val protobufEnc = new ProtobufEncoder
val zipCodec = RemoteServer.COMPRESSION_SCHEME match { val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder)) //case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
case _ => None case _ => None
} }
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
val stages: Array[ChannelHandler] = val stages: Array[ChannelHandler] =
zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient)) zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient))
.getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)) .getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient))
new StaticChannelPipeline(stages: _*) new StaticChannelPipeline(stages: _*)
} }
} }
@ -207,7 +316,7 @@ class RemoteClientPipelineFactory(name: String,
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@ChannelPipelineCoverage { val value = "all" } @ChannelPipelineCoverage {val value = "all"}
class RemoteClientHandler(val name: String, class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult], val futures: ConcurrentMap[Long, CompletableFutureResult],
val supervisors: ConcurrentMap[String, Actor], val supervisors: ConcurrentMap[String, Actor],
@ -215,7 +324,7 @@ class RemoteClientHandler(val name: String,
val remoteAddress: SocketAddress, val remoteAddress: SocketAddress,
val timer: HashedWheelTimer, val timer: HashedWheelTimer,
val client: RemoteClient) val client: RemoteClient)
extends SimpleChannelUpstreamHandler with Logging { extends SimpleChannelUpstreamHandler with Logging {
import Actor.Sender.Self import Actor.Sender.Self
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
@ -284,7 +393,7 @@ class RemoteClientHandler(val name: String,
val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$'))) val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$')))
val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length) val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length)
exceptionType exceptionType
.getConstructor(Array[Class[_]](classOf[String]): _*) .getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exceptionMessage).asInstanceOf[Throwable] .newInstance(exceptionMessage).asInstanceOf[Throwable]
} }
} }

View file

@ -182,13 +182,19 @@ class RemoteServer extends Logging {
} }
} }
def shutdown = { def shutdown = if (isRunning) {
RemoteServer.unregister(hostname, port) RemoteServer.unregister(hostname, port)
openChannels.disconnect openChannels.disconnect
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port) Cluster.deregisterLocalNode(hostname, port)
} }
// TODO: register active object in RemoteServer as well
def register(actor: Actor) = if (isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
}
} }
case class Codec(encoder : ChannelHandler, decoder : ChannelHandler) case class Codec(encoder : ChannelHandler, decoder : ChannelHandler)
@ -256,8 +262,7 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val message = event.getMessage val message = event.getMessage
if (message eq null) throw new IllegalStateException( if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
"Message in remote MessageEvent is null: " + event)
if (message.isInstanceOf[RemoteRequest]) { if (message.isInstanceOf[RemoteRequest]) {
handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel) handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
} }

View file

@ -4,7 +4,7 @@ import junit.framework.Test
import junit.framework.TestCase import junit.framework.TestCase
import junit.framework.TestSuite import junit.framework.TestSuite
import se.scalablesolutions.akka.actor.{RemoteActorTest, InMemoryActorTest, ThreadBasedActorTest, SupervisorTest, RemoteSupervisorTest, SchedulerTest} import se.scalablesolutions.akka.actor.{ClientInitiatedRemoteActorTest, InMemoryActorTest, ThreadBasedActorTest, SupervisorTest, RemoteSupervisorTest, SchedulerTest}
object AllTest extends TestCase { object AllTest extends TestCase {
def suite(): Test = { def suite(): Test = {
@ -16,7 +16,7 @@ object AllTest extends TestCase {
suite.addTestSuite(classOf[ThreadBasedActorTest]) suite.addTestSuite(classOf[ThreadBasedActorTest])
suite.addTestSuite(classOf[ReactorBasedSingleThreadEventDrivenDispatcherTest]) suite.addTestSuite(classOf[ReactorBasedSingleThreadEventDrivenDispatcherTest])
suite.addTestSuite(classOf[ReactorBasedThreadPoolEventDrivenDispatcherTest]) suite.addTestSuite(classOf[ReactorBasedThreadPoolEventDrivenDispatcherTest])
suite.addTestSuite(classOf[RemoteActorTest]) suite.addTestSuite(classOf[ClientInitiatedRemoteActorTest])
suite.addTestSuite(classOf[InMemoryActorTest]) suite.addTestSuite(classOf[InMemoryActorTest])
suite.addTestSuite(classOf[SchedulerTest]) suite.addTestSuite(classOf[SchedulerTest])
//suite.addTestSuite(classOf[TransactionClasherTest]) //suite.addTestSuite(classOf[TransactionClasherTest])

View file

@ -9,46 +9,47 @@ import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.dispatch.Dispatchers
object Global { object ClientInitiatedRemoteActorTest {
var oneWay = "nada" object Global {
var remoteReply = "nada" var oneWay = "nada"
} var remoteReply = "nada"
class RemoteActorSpecActorUnidirectional extends Actor { }
dispatcher = Dispatchers.newThreadBasedDispatcher(this) case class Send(actor: Actor)
def receive = { class RemoteActorSpecActorUnidirectional extends Actor {
case "OneWay" => dispatcher = Dispatchers.newThreadBasedDispatcher(this)
Global.oneWay = "received"
def receive = {
case "OneWay" =>
Global.oneWay = "received"
}
}
class RemoteActorSpecActorBidirectional extends Actor {
def receive = {
case "Hello" =>
reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
}
class RemoteActorSpecActorAsyncSender extends Actor {
def receive = {
case Send(actor: Actor) =>
actor ! "Hello"
case "World" =>
Global.remoteReply = "replied"
}
def send(actor: Actor) {
this ! Send(actor)
}
} }
} }
class RemoteActorSpecActorBidirectional extends Actor { class ClientInitiatedRemoteActorTest extends JUnitSuite {
def receive = { import ClientInitiatedRemoteActorTest._
case "Hello" =>
reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
}
case class Send(actor: Actor)
class RemoteActorSpecActorAsyncSender extends Actor {
def receive = {
case Send(actor: Actor) =>
actor ! "Hello"
case "World" =>
Global.remoteReply = "replied"
}
def send(actor: Actor) {
this ! Send(actor)
}
}
class RemoteActorTest extends JUnitSuite {
import Actor.Sender.Self
akka.Config.config akka.Config.config
val HOSTNAME = "localhost" val HOSTNAME = "localhost"
@ -57,6 +58,8 @@ class RemoteActorTest extends JUnitSuite {
var s1: RemoteServer = null var s1: RemoteServer = null
var s2: RemoteServer = null var s2: RemoteServer = null
import Actor.Sender.Self
@Before @Before
def init() { def init() {
s1 = new RemoteServer() s1 = new RemoteServer()
@ -116,26 +119,7 @@ class RemoteActorTest extends JUnitSuite {
actor.stop actor.stop
} }
/*
This test does not throw an exception since the
_contactAddress is always defined via the
global configuration if not set explicitly.
@Test
def shouldSendRemoteReplyException = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val sender = new RemoteActorSpecActorAsyncSender
sender.start
sender.send(actor)
Thread.sleep(500)
assert("exception" === Global.remoteReply)
actor.stop
}
*/
@Test @Test
def shouldSendReceiveException = { def shouldSendReceiveException = {
implicit val timeout = 500000000L implicit val timeout = 500000000L

View file

@ -0,0 +1,144 @@
package se.scalablesolutions.akka.actor
import java.util.concurrent.TimeUnit
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
object ServerInitiatedRemoteActorTest {
val HOSTNAME = "localhost"
val PORT = 9990
var server: RemoteServer = null
object Global {
var oneWay = "nada"
var remoteReply = "nada"
}
class RemoteActorSpecActorUnidirectional extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
start
def receive = {
case "OneWay" =>
println("================== ONEWAY")
Global.oneWay = "received"
}
}
class RemoteActorSpecActorBidirectional extends Actor {
start
def receive = {
case "Hello" =>
reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
}
case class Send(actor: Actor)
class RemoteActorSpecActorAsyncSender extends Actor {
start
def receive = {
case Send(actor: Actor) =>
actor ! "Hello"
case "World" =>
Global.remoteReply = "replied"
}
def send(actor: Actor) {
this ! Send(actor)
}
}
}
class ServerInitiatedRemoteActorTest extends JUnitSuite {
import ServerInitiatedRemoteActorTest._
import Actor.Sender.Self
akka.Config.config
private val unit = TimeUnit.MILLISECONDS
@Before
def init() {
server = new RemoteServer()
server.start(HOSTNAME, PORT)
server.register(new RemoteActorSpecActorUnidirectional)
server.register(new RemoteActorSpecActorBidirectional)
server.register(new RemoteActorSpecActorAsyncSender)
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
@After
def finished() {
server.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
}
@Test
def shouldSendOneWay = {
val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorUnidirectional",
5000L,
HOSTNAME, PORT)
val result = actor ! "OneWay"
Thread.sleep(1000)
assert("received" === Global.oneWay)
actor.stop
}
@Test
def shouldSendReplyAsync = {
val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional",
5000L,
HOSTNAME, PORT)
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
@Test
def shouldSendRemoteReply = {
implicit val timeout = 500000000L
val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional",
timeout,
HOSTNAME, PORT)
val sender = new RemoteActorSpecActorAsyncSender
sender.setReplyToAddress(HOSTNAME, PORT)
sender.start
sender.send(actor)
Thread.sleep(1000)
assert("replied" === Global.remoteReply)
actor.stop
}
@Test
def shouldSendReceiveException = {
implicit val timeout = 500000000L
val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorTest$RemoteActorSpecActorBidirectional",
timeout,
HOSTNAME, PORT)
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("expected" === e.getMessage())
}
actor.stop
}
}