diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 2847f89c69..82f04a2d54 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -20,14 +20,14 @@ import se.scalablesolutions.akka.util.{HashCode, Logging} import org.codehaus.aspectwerkz.proxy.Uuid import org.multiverse.api.ThreadLocalTransaction._ -import java.util.{Queue, LinkedList, HashSet} + +import java.util.{Queue, HashSet} import java.util.concurrent.ConcurrentLinkedQueue /** * Implements the Transactor abstraction. E.g. a transactional actor. *
- * Can also be achived by invokingmakeTransactionRequired
- * in the body of the Actor.
+ * Equivalent to invoking the makeTransactionRequired method in the body of the Actor
+ * Equivalent to invoking the makeRemote(..) method in the body of the Actor
@@ -586,7 +588,7 @@ trait Actor extends TransactionManagement {
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
- def dispatcher_=(dispatcher: MessageDispatcher): Unit = _mailbox.synchronized {
+ def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized {
if (!_isRunning) {
messageDispatcher.unregister(this)
messageDispatcher = dispatcher
@@ -599,27 +601,23 @@ trait Actor extends TransactionManagement {
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
- def makeRemote(hostname: String, port: Int): Unit = _remoteFlagLock.withWriteLock {
- makeRemote(new InetSocketAddress(hostname, port))
- }
+ def makeRemote(hostname: String, port: Int): Unit =
+ if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
+ else makeRemote(new InetSocketAddress(hostname, port))
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
- def makeRemote(address: InetSocketAddress): Unit = _remoteFlagLock.withWriteLock {
- _remoteAddress = Some(address)
- }
+ def makeRemote(address: InetSocketAddress): Unit =
+ if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
+ else _remoteAddress = Some(address)
/**
- * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
- */
- def setContactAddress(hostname:String, port:Int): Unit = {
- setContactAddress(new InetSocketAddress(hostname, port))
- }
+ * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
+ */
+ def setContactAddress(hostname:String, port:Int): Unit = setContactAddress(new InetSocketAddress(hostname, port))
- def setContactAddress(address: InetSocketAddress): Unit = {
- _contactAddress = Some(address)
- }
+ def setContactAddress(address: InetSocketAddress): Unit = _contactAddress = Some(address)
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
@@ -629,7 +627,7 @@ trait Actor extends TransactionManagement {
* TransactionManagement.disableTransactions
*
*/
- def makeTransactionRequired = _mailbox.synchronized {
+ def makeTransactionRequired = synchronized {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
@@ -770,7 +768,7 @@ trait Actor extends TransactionManagement {
actor
}
- private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
+ private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { // the price you pay for being able to make an actor remote at runtime
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -783,45 +781,40 @@ trait Actor extends TransactionManagement {
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
- // set the source fields used to reply back to the original sender
- // (i.e. not the remote proxy actor)
- if(sender.isDefined) {
- requestBuilder.setSourceTarget(sender.get.getClass.getName)
- requestBuilder.setSourceUuid(sender.get.uuid)
- log.debug("Setting sending actor as " + sender.get.getClass.getName + ", " + _contactAddress)
+ // set the source fields used to reply back to the original sender
+ // (i.e. not the remote proxy actor)
+ if(sender.isDefined) {
+ requestBuilder.setSourceTarget(sender.get.getClass.getName)
+ requestBuilder.setSourceUuid(sender.get.uuid)
+ log.debug("Setting sending actor as " + sender.get.getClass.getName + ", " + _contactAddress)
- if (sender.get._contactAddress.isDefined) {
- val addr = sender.get._contactAddress.get
- requestBuilder.setSourceHostname(addr.getHostName())
- requestBuilder.setSourcePort(addr.getPort())
- } else {
- // set the contact address to the default values from the
- // configuration file
- requestBuilder.setSourceHostname(Actor.HOSTNAME)
- requestBuilder.setSourcePort(Actor.PORT)
- }
+ if (sender.get._contactAddress.isDefined) {
+ val addr = sender.get._contactAddress.get
+ requestBuilder.setSourceHostname(addr.getHostName())
+ requestBuilder.setSourcePort(addr.getPort())
+ } else {
+ // set the contact address to the default values from the
+ // configuration file
+ requestBuilder.setSourceHostname(Actor.HOSTNAME)
+ requestBuilder.setSourcePort(Actor.PORT)
+ }
- }
+ }
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
} else {
val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
if (_isEventBased) {
- _mailbox.synchronized {
- _mailbox.add(invocation)
- if (_isSuspended) {
- _resume
- invocation.send
- }
+ _mailbox.add(invocation)
+ if (_isSuspended) {
+ invocation.send
}
- }
- else invocation.send
+ } else invocation.send
}
}
- private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long):
- CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
+ private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long): CompletableFutureResult = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -842,22 +835,17 @@ trait Actor extends TransactionManagement {
val future = new DefaultCompletableFutureResult(timeout)
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
if (_isEventBased) {
- _mailbox.synchronized {
- _mailbox.add(invocation)
- if (_isSuspended) {
- _resume
- invocation.send
- }
- }
+ _mailbox.add(invocation)
+ invocation.send
} else invocation.send
future
}
}
/**
- * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
+ * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
- private[akka] def invoke(messageHandle: MessageInvocation) = {
+ private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
@@ -977,13 +965,13 @@ trait Actor extends TransactionManagement {
}
}
- private[Actor] def restart(reason: AnyRef) = _mailbox.synchronized {
+ private[Actor] def restart(reason: AnyRef) = synchronized {
preRestart(reason)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason)
}
- private[akka] def registerSupervisorAsRemoteActor: Option[String] = _mailbox.synchronized {
+ private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
diff --git a/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index ea98d855bf..0a2a6f0ff0 100644
--- a/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -63,12 +63,12 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() {
def run = {
- val mailbox = invocation.receiver._mailbox
- mailbox.synchronized {
- val messages = mailbox.toArray
- messages.foreach(message => message.asInstanceOf[MessageInvocation].invoke)
- mailbox.clear
- invocation.receiver._suspend
+ invocation.receiver.synchronized {
+ val messages = invocation.receiver._mailbox.iterator
+ while (messages.hasNext) {
+ messages.next.asInstanceOf[MessageInvocation].invoke
+ messages.remove
+ }
}
}
})
@@ -85,5 +85,4 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
-
}
\ No newline at end of file
diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala
index 4c5a6dd701..d29087f828 100755
--- a/akka-actors/src/main/scala/nio/RemoteServer.scala
+++ b/akka-actors/src/main/scala/nio/RemoteServer.scala
@@ -29,7 +29,7 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
*
*
* If you need to create more than one, then you can use the RemoteServer:
- *
+ *
*
* val server = new RemoteServer
* server.start
@@ -41,7 +41,7 @@ object RemoteNode extends RemoteServer
/**
* This object holds configuration variables.
- *
+ *
* @author Jonas Bonér
*/
object RemoteServer {
@@ -91,7 +91,7 @@ class RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
// group of open channels, used for clean-up
- private val openChannels:ChannelGroup = new DefaultChannelGroup("akka-server")
+ private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-server")
def start: Unit = start(None)
@@ -111,13 +111,13 @@ class RemoteServer extends Logging {
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
- openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
+ openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
isRunning = true
}
}
def shutdown = {
- openChannels.close.awaitUninterruptibly()
+ openChannels.close.awaitUninterruptibly()
bootstrap.releaseExternalResources
}
}
@@ -129,7 +129,7 @@ class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, load
extends ChannelPipelineFactory {
import RemoteServer._
- def getPipeline: ChannelPipeline = {
+ def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder)
@@ -153,21 +153,21 @@ class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, load
/**
* @author Jonas Bonér
*/
-@ChannelPipelineCoverage { val value = "all" }
+@ChannelPipelineCoverage {val value = "all"}
class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader])
extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
-
+
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
-
- /*
- * channelOpen overriden to store open channels for a clean shutdown
- * of a RemoteServer. If a channel is closed before, it is
- * automatically removed from the open channels group.
- */
+
+ /**
+ * ChannelOpen overridden to store open channels for a clean shutdown
+ * of a RemoteServer. If a channel is closed before, it is
+ * automatically removed from the open channels group.
+ */
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
- openChannels.add(ctx.getChannel)
+ openChannels.add(ctx.getChannel)
}
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
@@ -202,37 +202,33 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getUuid, request.getTimeout)
actor.start
+
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) {
- if(request.hasSourceHostname && request.hasSourcePort) {
- // re-create the sending actor
- val targetClass = if(request.hasSourceTarget) request.getSourceTarget
- else request.getTarget
-/* val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(targetClass)
- else Class.forName(targetClass)
- val remoteActor = clazz.newInstance.asInstanceOf[Actor]
- log.debug("Re-creating sending actor [%s]", targetClass)
- remoteActor._uuid = request.getSourceUuid
- remoteActor.timeout = request.getTimeout
-*/
- val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout)
- remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort)
- remoteActor.start
+ if (request.hasSourceHostname && request.hasSourcePort) {
+ // re-create the sending actor
+ val targetClass = if (request.hasSourceTarget) request.getSourceTarget
+ else request.getTarget
+
+ val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout)
+ if (!remoteActor.isRunning) {
+ remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort)
+ remoteActor.start
+ }
actor.!(message)(remoteActor)
- } else {
- // couldnt find a way to reply, send the message without a source/sender
- actor.send(message)
- }
- }
- else {
+ } else {
+ // couldn't find a way to reply, send the message without a source/sender
+ actor.send(message)
+ }
+ } else {
try {
val resultOrNone = actor !! message
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReply.newBuilder
- .setId(request.getId)
- .setIsSuccessful(true)
- .setIsActor(true)
+ .setId(request.getId)
+ .setIsSuccessful(true)
+ .setIsActor(true)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
@@ -241,15 +237,15 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
case e: Throwable =>
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
val replyBuilder = RemoteReply.newBuilder
- .setId(request.getId)
- .setException(e.getClass.getName + "$" + e.getMessage)
- .setIsSuccessful(false)
- .setIsActor(true)
+ .setId(request.getId)
+ .setException(e.getClass.getName + "$" + e.getMessage)
+ .setIsSuccessful(false)
+ .setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
}
- }
+ }
}
private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = {
@@ -269,9 +265,9 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
log.debug("Returning result from remote active object invocation [%s]", result)
val replyBuilder = RemoteReply.newBuilder
- .setId(request.getId)
- .setIsSuccessful(true)
- .setIsActor(false)
+ .setId(request.getId)
+ .setIsSuccessful(true)
+ .setIsActor(false)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
@@ -281,20 +277,20 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
case e: InvocationTargetException =>
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReply.newBuilder
- .setId(request.getId)
- .setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
- .setIsSuccessful(false)
- .setIsActor(false)
+ .setId(request.getId)
+ .setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
+ .setIsSuccessful(false)
+ .setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
case e: Throwable =>
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReply.newBuilder
- .setId(request.getId)
- .setException(e.getClass.getName + "$" + e.getMessage)
- .setIsSuccessful(false)
- .setIsActor(false)
+ .setId(request.getId)
+ .setException(e.getClass.getName + "$" + e.getMessage)
+ .setIsSuccessful(false)
+ .setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
@@ -325,10 +321,10 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
val proxyName = argString.replace(AW_PROXY_PREFIX, "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length)
val activeObject = createActiveObject(proxyName, timeout)
unescapedArgs(i) = activeObject
- unescapedArgClasses(i) = Class.forName(proxyName)
+ unescapedArgClasses(i) = Class.forName(proxyName)
} else {
unescapedArgs(i) = args(i)
- unescapedArgClasses(i) = argClasses(i)
+ unescapedArgClasses(i) = argClasses(i)
}
}
(unescapedArgs, unescapedArgClasses)
@@ -340,7 +336,7 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
try {
log.info("Creating a new remote active object [%s]", name)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
- else Class.forName(name)
+ else Class.forName(name)
val newInstance = ActiveObject.newInstance(clazz, timeout).asInstanceOf[AnyRef]
activeObjects.put(name, newInstance)
newInstance
@@ -358,7 +354,7 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
try {
log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
- else Class.forName(name)
+ else Class.forName(name)
val newInstance = clazz.newInstance.asInstanceOf[Actor]
newInstance._uuid = uuid
newInstance.timeout = timeout