Now doing a 'reply(..)' to remote sender after receiving a remote message through '!' works. Added tests.

Also removed the Logging trait from Actor for lower memory footprint.
This commit is contained in:
Jonas Bonér 2010-04-06 12:45:09 +02:00
parent 85cb032964
commit 9c57c3ba97
11 changed files with 191 additions and 147 deletions

View file

@ -76,7 +76,7 @@ object AMQP {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class AMQPSupervisor extends Actor { class AMQPSupervisor extends Actor with Logging {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor] private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor]
@ -516,7 +516,7 @@ object AMQP {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
trait FaultTolerantConnectionActor extends Actor { trait FaultTolerantConnectionActor extends Actor with Logging {
val reconnectionTimer = new Timer val reconnectionTimer = new Timer
var connection: Connection = _ var connection: Connection = _

View file

@ -12,7 +12,7 @@ import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
@ -242,7 +242,7 @@ object Actor extends Logging {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
trait Actor extends TransactionManagement with Logging { trait Actor extends TransactionManagement {
implicit protected val self: Option[Actor] = Some(this) implicit protected val self: Option[Actor] = Some(this)
// Only mutable for RemoteServer in order to maintain identity across nodes // Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString private[akka] var _uuid = UUID.newUuid.toString
@ -834,6 +834,12 @@ trait Actor extends TransactionManagement with Logging {
requestBuilder.setSourceHostname(host) requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port) requestBuilder.setSourcePort(port)
if (RemoteServer.serverFor(host, port).isEmpty) {
val server = new RemoteServer
server.start(host, port)
}
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get)
} }
RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None) RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)

View file

@ -95,7 +95,7 @@ private[akka] object ClusterActor {
* Provides most of the behavior out of the box * Provides most of the behavior out of the box
* only needs to be gives hooks into the underlaying cluster impl. * only needs to be gives hooks into the underlaying cluster impl.
*/ */
abstract class BasicClusterActor extends ClusterActor { abstract class BasicClusterActor extends ClusterActor with Logging {
import ClusterActor._ import ClusterActor._
type ADDR_T type ADDR_T

View file

@ -106,7 +106,7 @@ object RemoteServer {
} }
} }
private[remote] def serverFor(hostname: String, port: Int): Option[RemoteServer] = { private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = {
val server = remoteServers.get(Address(hostname, port)) val server = remoteServers.get(Address(hostname, port))
if (server eq null) None if (server eq null) None
else Some(server) else Some(server)
@ -141,8 +141,7 @@ class RemoteServer extends Logging {
private var hostname = RemoteServer.HOSTNAME private var hostname = RemoteServer.HOSTNAME
private var port = RemoteServer.PORT private var port = RemoteServer.PORT
@volatile private var isRunning = false @volatile private var _isRunning = false
@volatile private var isConfigured = false
private val factory = new NioServerSocketChannelFactory( private val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool, Executors.newCachedThreadPool,
@ -153,6 +152,8 @@ class RemoteServer extends Logging {
// group of open channels, used for clean-up // group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server") private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
def isRunning = _isRunning
def start: Unit = start(None) def start: Unit = start(None)
def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader) def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader)
@ -161,7 +162,7 @@ class RemoteServer extends Logging {
def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): Unit = synchronized { def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): Unit = synchronized {
try { try {
if (!isRunning) { if (!_isRunning) {
hostname = _hostname hostname = _hostname
port = _port port = _port
log.info("Starting remote server at [%s:%s]", hostname, port) log.info("Starting remote server at [%s:%s]", hostname, port)
@ -174,7 +175,7 @@ class RemoteServer extends Logging {
bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS) bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
isRunning = true _isRunning = true
Cluster.registerLocalNode(hostname, port) Cluster.registerLocalNode(hostname, port)
} }
} catch { } catch {
@ -182,31 +183,37 @@ class RemoteServer extends Logging {
} }
} }
def shutdown = if (isRunning) { def shutdown = synchronized {
if (_isRunning) {
RemoteServer.unregister(hostname, port) RemoteServer.unregister(hostname, port)
openChannels.disconnect openChannels.disconnect
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port) Cluster.deregisterLocalNode(hostname, port)
} }
}
// TODO: register active object in RemoteServer as well // TODO: register active object in RemoteServer as well
/** /**
* Register Remote Actor by the Actor's 'id' field. * Register Remote Actor by the Actor's 'id' field.
*/ */
def register(actor: Actor) = if (isRunning) { def register(actor: Actor) = synchronized {
if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId) log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
} }
}
/** /**
* Register Remote Actor by a specific 'id' passed as argument. * Register Remote Actor by a specific 'id' passed as argument.
*/ */
def register(id: String, actor: Actor) = if (isRunning) { def register(id: String, actor: Actor) = synchronized {
if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id) log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
} }
}
} }
case class Codec(encoder : ChannelHandler, decoder : ChannelHandler) case class Codec(encoder : ChannelHandler, decoder : ChannelHandler)
@ -294,6 +301,7 @@ class RemoteServerHandler(
private def dispatchToActor(request: RemoteRequest, channel: Channel) = { private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
log.debug("Dispatching to remote actor [%s]", request.getTarget) log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getUuid, request.getTimeout) val actor = createActor(request.getTarget, request.getUuid, request.getTimeout)
actor.start
val message = RemoteProtocolBuilder.getMessage(request) val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) { if (request.getIsOneWay) {
@ -389,19 +397,6 @@ class RemoteServerHandler(
} }
} }
/*
private def continueTransaction(request: RemoteRequest) = {
val tx = request.tx
if (tx.isDefined) {
tx.get.reinit
TransactionManagement.threadBoundTx.set(tx)
setThreadLocalTransaction(tx.transaction)
} else {
TransactionManagement.threadBoundTx.set(None)
setThreadLocalTransaction(null)
}
}
*/
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = { private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
val unescapedArgs = new Array[AnyRef](args.size) val unescapedArgs = new Array[AnyRef](args.size)
val unescapedArgClasses = new Array[Class[_]](args.size) val unescapedArgClasses = new Array[Class[_]](args.size)
@ -410,7 +405,7 @@ class RemoteServerHandler(
val arg = args(i) val arg = args(i)
if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith(AW_PROXY_PREFIX)) { if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith(AW_PROXY_PREFIX)) {
val argString = arg.asInstanceOf[String] val argString = arg.asInstanceOf[String]
val proxyName = argString.replace(AW_PROXY_PREFIX, "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length) val proxyName = argString.replace(AW_PROXY_PREFIX, "")
val activeObject = createActiveObject(proxyName, timeout) val activeObject = createActiveObject(proxyName, timeout)
unescapedArgs(i) = activeObject unescapedArgs(i) = activeObject
unescapedArgClasses(i) = Class.forName(proxyName) unescapedArgClasses(i) = Class.forName(proxyName)
@ -440,6 +435,11 @@ class RemoteServerHandler(
} else activeObjectOrNull } else activeObjectOrNull
} }
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
* If actor already created then just return it from the registry.
* Does not start the actor.
*/
private def createActor(name: String, uuid: String, timeout: Long): Actor = { private def createActor(name: String, uuid: String, timeout: Long): Actor = {
val actorOrNull = actors.get(uuid) val actorOrNull = actors.get(uuid)
if (actorOrNull eq null) { if (actorOrNull eq null) {
@ -452,7 +452,6 @@ class RemoteServerHandler(
newInstance.timeout = timeout newInstance.timeout = timeout
newInstance._remoteAddress = None newInstance._remoteAddress = None
actors.put(uuid, newInstance) actors.put(uuid, newInstance)
newInstance.start
newInstance newInstance
} catch { } catch {
case e => case e =>

View file

@ -51,10 +51,9 @@ object TransactionManagement extends TransactionManagement {
} }
} }
trait TransactionManagement extends Logging { trait TransactionManagement {
private[akka] def createNewTransactionSet: CountDownCommitBarrier = { private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
log.trace("Creating new transaction set")
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS) val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
TransactionManagement.transactionSet.set(Some(txSet)) TransactionManagement.transactionSet.set(Some(txSet))
txSet txSet
@ -67,12 +66,10 @@ trait TransactionManagement extends Logging {
if (tx.isDefined) TransactionManagement.transaction.set(tx) if (tx.isDefined) TransactionManagement.transaction.set(tx)
private[akka] def clearTransactionSet = { private[akka] def clearTransactionSet = {
log.trace("Clearing transaction set")
TransactionManagement.transactionSet.set(None) TransactionManagement.transactionSet.set(None)
} }
private[akka] def clearTransaction = { private[akka] def clearTransaction = {
log.trace("Clearing transaction")
TransactionManagement.transaction.set(None) TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null) setThreadLocalTransaction(null)
} }

View file

@ -46,6 +46,27 @@ class RemoteActorSpecActorAsyncSender extends Actor {
} }
} }
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
reply("World")
}
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: Actor = _
var latch: CountDownLatch = _
def sendOff = sendTo ! "Hello"
def receive = {
case msg: AnyRef =>
state = Some(msg)
latch.countDown
}
}
class ClientInitiatedRemoteActorSpec extends JUnitSuite { class ClientInitiatedRemoteActorSpec extends JUnitSuite {
se.scalablesolutions.akka.config.Config.config se.scalablesolutions.akka.config.Config.config
@ -82,11 +103,30 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
val actor = new RemoteActorSpecActorUnidirectional val actor = new RemoteActorSpecActorUnidirectional
actor.makeRemote(HOSTNAME, PORT1) actor.makeRemote(HOSTNAME, PORT1)
actor.start actor.start
val result = actor ! "OneWay" actor ! "OneWay"
assert(Global.oneWay.await(1, TimeUnit.SECONDS)) assert(Global.oneWay.await(1, TimeUnit.SECONDS))
actor.stop actor.stop
} }
@Test
def shouldSendOneWayAndReceiveReply = {
val actor = new SendOneWayAndReplyReceiverActor
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val latch = new CountDownLatch(1)
val sender = new SendOneWayAndReplySenderActor
sender.setReplyToAddress(HOSTNAME, PORT2)
sender.sendTo = actor
sender.latch = latch
sender.start
sender.sendOff
assert(latch.await(1, TimeUnit.SECONDS))
assert(sender.state.isDefined === true)
assert("World" === sender.state.get.asInstanceOf[String])
actor.stop
sender.stop
}
@Test @Test
def shouldSendReplyAsync = { def shouldSendReplyAsync = {
val actor = new RemoteActorSpecActorBidirectional val actor = new RemoteActorSpecActorBidirectional
@ -101,6 +141,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
def shouldSendRemoteReply = { def shouldSendRemoteReply = {
implicit val timeout = 500000000L implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional val actor = new RemoteActorSpecActorBidirectional
actor.setReplyToAddress(HOSTNAME, PORT2)
actor.makeRemote(HOSTNAME, PORT2) actor.makeRemote(HOSTNAME, PORT2)
actor.start actor.start
@ -128,3 +169,4 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
actor.stop actor.stop
} }
} }

View file

@ -77,7 +77,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
new Thread(new Runnable() { new Thread(new Runnable() {
def run = { def run = {
RemoteNode.start RemoteNode.start(RemoteServer.HOSTNAME, 9988)
} }
}).start }).start
Thread.sleep(1000) Thread.sleep(1000)
@ -335,7 +335,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
// implementation of the Actors we want to use. // implementation of the Actors we want to use.
pingpong1 = new RemotePingPong1Actor pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -350,7 +350,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getSingleActorOneForOneSupervisor: Supervisor = { def getSingleActorOneForOneSupervisor: Supervisor = {
pingpong1 = new RemotePingPong1Actor pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -364,11 +364,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getMultipleActorsAllForOneConf: Supervisor = { def getMultipleActorsAllForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor pingpong2 = new RemotePingPong2Actor
pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor pingpong3 = new RemotePingPong3Actor
pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -390,11 +390,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getMultipleActorsOneForOneConf: Supervisor = { def getMultipleActorsOneForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor pingpong2 = new RemotePingPong2Actor
pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor pingpong3 = new RemotePingPong3Actor
pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(
@ -416,11 +416,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
def getNestedSupervisorsAllForOneConf: Supervisor = { def getNestedSupervisorsAllForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor pingpong2 = new RemotePingPong2Actor
pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor pingpong3 = new RemotePingPong3Actor
pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory( val factory = SupervisorFactory(
SupervisorConfig( SupervisorConfig(

View file

@ -65,7 +65,7 @@ class Transformer(producer: Actor) extends Actor {
} }
} }
class Subscriber(name:String, uri: String) extends Actor with Consumer { class Subscriber(name:String, uri: String) extends Actor with Consumer with Logging {
def endpointUri = uri def endpointUri = uri
protected def receive = { protected def receive = {

View file

@ -127,7 +127,7 @@ class PersistentSimpleService extends Transactor {
} }
@Path("/chat") @Path("/chat")
class Chat extends Actor { class Chat extends Actor with Logging {
case class Chat(val who: String, val what: String, val msg: String) case class Chat(val who: String, val what: String, val msg: String)
@Suspend @Suspend

View file

@ -245,7 +245,7 @@ trait BasicAuthenticationActor extends AuthenticationActor[BasicCredentials] {
* class to create an authenticator. Don't forget to set the authenticator FQN in the * class to create an authenticator. Don't forget to set the authenticator FQN in the
* rest-part of the akka config * rest-part of the akka config
*/ */
trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] { trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] with Logging {
import Enc._ import Enc._
private object InvalidateNonces private object InvalidateNonces
@ -346,7 +346,7 @@ import org.ietf.jgss.GSSContext
import org.ietf.jgss.GSSCredential import org.ietf.jgss.GSSCredential
import org.ietf.jgss.GSSManager import org.ietf.jgss.GSSManager
trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] { trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] with Logging {
override def unauthorized = override def unauthorized =
Response.status(401).header("WWW-Authenticate", "Negotiate").build Response.status(401).header("WWW-Authenticate", "Negotiate").build