Fixed bug in event-driven dispatcher + fixed bug in makeRemote when run on a remote instance

This commit is contained in:
Jonas Bonér 2009-12-15 10:31:24 +01:00
parent c1e74fb2ab
commit b8eea97ecc
3 changed files with 111 additions and 128 deletions

View file

@ -20,14 +20,14 @@ import se.scalablesolutions.akka.util.{HashCode, Logging}
import org.codehaus.aspectwerkz.proxy.Uuid
import org.multiverse.api.ThreadLocalTransaction._
import java.util.{Queue, LinkedList, HashSet}
import java.util.{Queue, HashSet}
import java.util.concurrent.ConcurrentLinkedQueue
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
* <p/>
* Can also be achived by invoking <code>makeTransactionRequired</code>
* in the body of the <code>Actor</code>.
* Equivalent to invoking the <code>makeTransactionRequired</code> method in the body of the <code>Actor</code
*/
trait Transactor extends Actor {
makeTransactionRequired
@ -35,7 +35,8 @@ trait Transactor extends Actor {
/**
* Extend this abstract class to create a remote actor.
* Equivalent to invoking the 'makeRemote(..)' method in or on the actor.
* <p/>
* Equivalent to invoking the <code>makeRemote(..)</code> method in the body of the <code>Actor</code
*/
abstract class RemoteActor(hostname: String, port: Int) extends Actor {
makeRemote(hostname, port)
@ -231,12 +232,11 @@ trait Actor extends TransactionManagement {
@volatile private var _isShutDown: Boolean = false
private var _isEventBased: Boolean = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private val _remoteFlagLock = new ReadWriteLock
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
private[akka] var _contactAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
// ====================================
// protected fields
@ -416,7 +416,7 @@ trait Actor extends TransactionManagement {
/**
* Starts up the actor and its message queue.
*/
def start: Actor = _mailbox.synchronized {
def start: Actor = synchronized {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
messageDispatcher.register(this)
@ -438,7 +438,7 @@ trait Actor extends TransactionManagement {
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = _mailbox.synchronized {
def stop = synchronized {
if (_isRunning) {
messageDispatcher.unregister(this)
if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
@ -449,6 +449,8 @@ trait Actor extends TransactionManagement {
}
}
def isRunning = _isRunning
/**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
* <p/>
@ -586,7 +588,7 @@ trait Actor extends TransactionManagement {
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(dispatcher: MessageDispatcher): Unit = _mailbox.synchronized {
def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized {
if (!_isRunning) {
messageDispatcher.unregister(this)
messageDispatcher = dispatcher
@ -599,27 +601,23 @@ trait Actor extends TransactionManagement {
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(hostname: String, port: Int): Unit = _remoteFlagLock.withWriteLock {
makeRemote(new InetSocketAddress(hostname, port))
}
def makeRemote(hostname: String, port: Int): Unit =
if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else makeRemote(new InetSocketAddress(hostname, port))
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(address: InetSocketAddress): Unit = _remoteFlagLock.withWriteLock {
_remoteAddress = Some(address)
}
def makeRemote(address: InetSocketAddress): Unit =
if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else _remoteAddress = Some(address)
/**
* Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
*/
def setContactAddress(hostname:String, port:Int): Unit = {
setContactAddress(new InetSocketAddress(hostname, port))
}
* Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
*/
def setContactAddress(hostname:String, port:Int): Unit = setContactAddress(new InetSocketAddress(hostname, port))
def setContactAddress(address: InetSocketAddress): Unit = {
_contactAddress = Some(address)
}
def setContactAddress(address: InetSocketAddress): Unit = _contactAddress = Some(address)
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
@ -629,7 +627,7 @@ trait Actor extends TransactionManagement {
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired = _mailbox.synchronized {
def makeTransactionRequired = synchronized {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
@ -770,7 +768,7 @@ trait Actor extends TransactionManagement {
actor
}
private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { // the price you pay for being able to make an actor remote at runtime
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@ -783,45 +781,40 @@ trait Actor extends TransactionManagement {
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
if(sender.isDefined) {
requestBuilder.setSourceTarget(sender.get.getClass.getName)
requestBuilder.setSourceUuid(sender.get.uuid)
log.debug("Setting sending actor as " + sender.get.getClass.getName + ", " + _contactAddress)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
if(sender.isDefined) {
requestBuilder.setSourceTarget(sender.get.getClass.getName)
requestBuilder.setSourceUuid(sender.get.uuid)
log.debug("Setting sending actor as " + sender.get.getClass.getName + ", " + _contactAddress)
if (sender.get._contactAddress.isDefined) {
val addr = sender.get._contactAddress.get
requestBuilder.setSourceHostname(addr.getHostName())
requestBuilder.setSourcePort(addr.getPort())
} else {
// set the contact address to the default values from the
// configuration file
requestBuilder.setSourceHostname(Actor.HOSTNAME)
requestBuilder.setSourcePort(Actor.PORT)
}
if (sender.get._contactAddress.isDefined) {
val addr = sender.get._contactAddress.get
requestBuilder.setSourceHostname(addr.getHostName())
requestBuilder.setSourcePort(addr.getPort())
} else {
// set the contact address to the default values from the
// configuration file
requestBuilder.setSourceHostname(Actor.HOSTNAME)
requestBuilder.setSourcePort(Actor.PORT)
}
}
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
} else {
val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
if (_isEventBased) {
_mailbox.synchronized {
_mailbox.add(invocation)
if (_isSuspended) {
_resume
invocation.send
}
_mailbox.add(invocation)
if (_isSuspended) {
invocation.send
}
}
else invocation.send
} else invocation.send
}
}
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long):
CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long): CompletableFutureResult = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@ -842,22 +835,17 @@ trait Actor extends TransactionManagement {
val future = new DefaultCompletableFutureResult(timeout)
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
if (_isEventBased) {
_mailbox.synchronized {
_mailbox.add(invocation)
if (_isSuspended) {
_resume
invocation.send
}
}
_mailbox.add(invocation)
invocation.send
} else invocation.send
future
}
}
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
private[akka] def invoke(messageHandle: MessageInvocation) = {
private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
@ -977,13 +965,13 @@ trait Actor extends TransactionManagement {
}
}
private[Actor] def restart(reason: AnyRef) = _mailbox.synchronized {
private[Actor] def restart(reason: AnyRef) = synchronized {
preRestart(reason)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason)
}
private[akka] def registerSupervisorAsRemoteActor: Option[String] = _mailbox.synchronized {
private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)

View file

@ -63,12 +63,12 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() {
def run = {
val mailbox = invocation.receiver._mailbox
mailbox.synchronized {
val messages = mailbox.toArray
messages.foreach(message => message.asInstanceOf[MessageInvocation].invoke)
mailbox.clear
invocation.receiver._suspend
invocation.receiver.synchronized {
val messages = invocation.receiver._mailbox.iterator
while (messages.hasNext) {
messages.next.asInstanceOf[MessageInvocation].invoke
messages.remove
}
}
}
})
@ -85,5 +85,4 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
}

View file

@ -29,7 +29,7 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
* </pre>
*
* If you need to create more than one, then you can use the RemoteServer:
*
*
* <pre>
* val server = new RemoteServer
* server.start
@ -41,7 +41,7 @@ object RemoteNode extends RemoteServer
/**
* This object holds configuration variables.
*
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer {
@ -91,7 +91,7 @@ class RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
// group of open channels, used for clean-up
private val openChannels:ChannelGroup = new DefaultChannelGroup("akka-server")
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-server")
def start: Unit = start(None)
@ -111,13 +111,13 @@ class RemoteServer extends Logging {
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
isRunning = true
}
}
def shutdown = {
openChannels.close.awaitUninterruptibly()
openChannels.close.awaitUninterruptibly()
bootstrap.releaseExternalResources
}
}
@ -129,7 +129,7 @@ class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, load
extends ChannelPipelineFactory {
import RemoteServer._
def getPipeline: ChannelPipeline = {
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder)
@ -153,21 +153,21 @@ class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, load
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ChannelPipelineCoverage { val value = "all" }
@ChannelPipelineCoverage {val value = "all"}
class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader])
extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
/*
* channelOpen overriden to store open channels for a clean shutdown
* of a RemoteServer. If a channel is closed before, it is
* automatically removed from the open channels group.
*/
/**
* ChannelOpen overridden to store open channels for a clean shutdown
* of a RemoteServer. If a channel is closed before, it is
* automatically removed from the open channels group.
*/
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
openChannels.add(ctx.getChannel)
openChannels.add(ctx.getChannel)
}
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
@ -202,37 +202,33 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getUuid, request.getTimeout)
actor.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) {
if(request.hasSourceHostname && request.hasSourcePort) {
// re-create the sending actor
val targetClass = if(request.hasSourceTarget) request.getSourceTarget
else request.getTarget
/* val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(targetClass)
else Class.forName(targetClass)
val remoteActor = clazz.newInstance.asInstanceOf[Actor]
log.debug("Re-creating sending actor [%s]", targetClass)
remoteActor._uuid = request.getSourceUuid
remoteActor.timeout = request.getTimeout
*/
val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout)
remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort)
remoteActor.start
if (request.hasSourceHostname && request.hasSourcePort) {
// re-create the sending actor
val targetClass = if (request.hasSourceTarget) request.getSourceTarget
else request.getTarget
val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout)
if (!remoteActor.isRunning) {
remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort)
remoteActor.start
}
actor.!(message)(remoteActor)
} else {
// couldnt find a way to reply, send the message without a source/sender
actor.send(message)
}
}
else {
} else {
// couldn't find a way to reply, send the message without a source/sender
actor.send(message)
}
} else {
try {
val resultOrNone = actor !! message
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setIsSuccessful(true)
.setIsActor(true)
.setId(request.getId)
.setIsSuccessful(true)
.setIsActor(true)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
@ -241,15 +237,15 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
case e: Throwable =>
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
.setIsActor(true)
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
}
}
}
}
private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = {
@ -269,9 +265,9 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
log.debug("Returning result from remote active object invocation [%s]", result)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setIsSuccessful(true)
.setIsActor(false)
.setId(request.getId)
.setIsSuccessful(true)
.setIsActor(false)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
@ -281,20 +277,20 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
case e: InvocationTargetException =>
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
.setId(request.getId)
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
case e: Throwable =>
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
@ -325,10 +321,10 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
val proxyName = argString.replace(AW_PROXY_PREFIX, "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length)
val activeObject = createActiveObject(proxyName, timeout)
unescapedArgs(i) = activeObject
unescapedArgClasses(i) = Class.forName(proxyName)
unescapedArgClasses(i) = Class.forName(proxyName)
} else {
unescapedArgs(i) = args(i)
unescapedArgClasses(i) = argClasses(i)
unescapedArgClasses(i) = argClasses(i)
}
}
(unescapedArgs, unescapedArgClasses)
@ -340,7 +336,7 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
try {
log.info("Creating a new remote active object [%s]", name)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
else Class.forName(name)
val newInstance = ActiveObject.newInstance(clazz, timeout).asInstanceOf[AnyRef]
activeObjects.put(name, newInstance)
newInstance
@ -358,7 +354,7 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
try {
log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
else Class.forName(name)
val newInstance = clazz.newInstance.asInstanceOf[Actor]
newInstance._uuid = uuid
newInstance.timeout = timeout