diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 41691b4326..c14ff00c3c 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -67,6 +67,7 @@ trait MessageQueue { } /* Tells the dispatcher that it should create a bounded mailbox with the specified push timeout + * (If capacity > 0) */ case class BoundedMailbox(capacity: Int, pushTimeOut: Option[Duration]) diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 5c06f058f9..599bac87fd 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -85,9 +85,7 @@ trait ThreadMessageBlockingQueue extends MessageQueue with BlockingQueue[Message trait ThreadMessageTransferQueue extends ThreadMessageBlockingQueue with TransferQueue[MessageInvocation] { final override def append(invocation: MessageInvocation): Unit = { - if(!tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer - if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting - throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") - } + if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting + throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") } } diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index 9657ad3fe4..5ad1b89aca 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -46,7 +46,7 @@ trait ThreadPoolBuilder extends Logging { if (boundedExecutorBound > 0) { val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound) - boundedExecutorBound = -1 + boundedExecutorBound = -1 //Why is this here? executor = boundedExecutor } else { executor = threadPoolBuilder @@ -208,7 +208,7 @@ trait ThreadPoolBuilder extends Logging { /** * @author Jonas Bonér */ - class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService { + class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging { protected val semaphore = new Semaphore(bound) def execute(command: Runnable) = { @@ -226,6 +226,9 @@ trait ThreadPoolBuilder extends Logging { } catch { case e: RejectedExecutionException => semaphore.release + case e => + log.error(e,"Unexpected exception") + throw e } } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 5f24def4f5..142670a84a 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -193,8 +193,10 @@ case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer class RemoteServer extends Logging with ListenerManagement { def name = "RemoteServer@" + hostname + ":" + port - private[akka] var hostname = RemoteServer.HOSTNAME - private[akka] var port = RemoteServer.PORT + private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT) + + def hostname = address.hostname + def port = address.port @volatile private var _isRunning = false @@ -230,12 +232,11 @@ class RemoteServer extends Logging with ListenerManagement { private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized { try { if (!_isRunning) { - hostname = _hostname - port = _port + address = RemoteServer.Address(_hostname,_port) log.info("Starting remote server at [%s:%s]", hostname, port) RemoteServer.register(hostname, port, this) val pipelineFactory = new RemoteServerPipelineFactory( - name, openChannels, loader, actors, typedActors, this) + name, openChannels, loader, this) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) @@ -273,9 +274,9 @@ class RemoteServer extends Logging with ListenerManagement { // TODO: register typed actor in RemoteServer as well /** - * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. + * Register Remote Actor by the Actor's 'uuid' field. It starts the Actor if it is not started already. */ - def register(actorRef: ActorRef): Unit = register(actorRef.id,actorRef) + def register(actorRef: ActorRef): Unit = register(actorRef.uuid,actorRef) /** * Register Remote Actor by a specific 'id' passed as argument. @@ -284,24 +285,24 @@ class RemoteServer extends Logging with ListenerManagement { */ def register(id: String, actorRef: ActorRef): Unit = synchronized { if (_isRunning) { - val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors - if (!actors.contains(id)) { + val actorMap = actors() + if (!actorMap.contains(id)) { if (!actorRef.isRunning) actorRef.start log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) - actors.put(id, actorRef) + actorMap.put(id, actorRef) } } } /** - * Unregister Remote Actor that is registered using its 'id' field (not custom ID). + * Unregister Remote Actor that is registered using its 'uuid' field (not custom ID). */ def unregister(actorRef: ActorRef):Unit = synchronized { if (_isRunning) { - log.debug("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) - val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors - actors.remove(actorRef.id) - if (actorRef.registeredInRemoteNodeDuringSerialization) actors.remove(actorRef.uuid) + log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid) + val actorMap = actors() + actorMap remove actorRef.uuid + if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid } } @@ -313,10 +314,10 @@ class RemoteServer extends Logging with ListenerManagement { def unregister(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote actor with id [%s]", id) - val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors - val actorRef = actors.get(id) - actors.remove(id) - if (actorRef.registeredInRemoteNodeDuringSerialization) actors.remove(actorRef.uuid) + val actorMap = actors() + val actorRef = actorMap get id + actorMap remove id + if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid } } @@ -324,11 +325,11 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) - private def actors() : ConcurrentHashMap[String, ActorRef] = { - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors + private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { + RemoteServer.actorsFor(address).actors } - private def typedActors() : ConcurrentHashMap[String, AnyRef] = { - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors + private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = { + RemoteServer.actorsFor(address).typedActors } } @@ -354,8 +355,6 @@ class RemoteServerPipelineFactory( val name: String, val openChannels: ChannelGroup, val loader: Option[ClassLoader], - val actors: (() => ConcurrentHashMap[String, ActorRef]), - val typedActors: (() => ConcurrentHashMap[String, AnyRef]), val server: RemoteServer) extends ChannelPipelineFactory { import RemoteServer._ @@ -379,7 +378,7 @@ class RemoteServerPipelineFactory( case _ => (join(), join()) } - val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors, server) + val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer) new StaticChannelPipeline(stages: _*) } @@ -393,8 +392,6 @@ class RemoteServerHandler( val name: String, val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], - val actors: (() => ConcurrentHashMap[String, ActorRef]), - val typedActors: (() => ConcurrentHashMap[String, AnyRef]), val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { val AW_PROXY_PREFIX = "$$ProxiedByAW".intern @@ -545,7 +542,7 @@ class RemoteServerHandler( val name = actorInfo.getTarget val timeout = actorInfo.getTimeout - val registeredActors = actors() + val registeredActors = server.actors() val actorRefOrNull = registeredActors get uuid if (actorRefOrNull eq null) { @@ -570,7 +567,7 @@ class RemoteServerHandler( private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { val uuid = actorInfo.getUuid - val registeredTypedActors = typedActors() + val registeredTypedActors = server.typedActors() val typedActorOrNull = registeredTypedActors get uuid if (typedActorOrNull eq null) {