diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index f3a479e6fd..51bbfd3477 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -125,7 +125,7 @@ object ActorRegistry extends ListenerManagement { actorsByUUID.put(actor.uuid, actor) // notify listeners - foreachListener(_ ! ActorRegistered(actor)) + notifyListeners(ActorRegistered(actor)) } /** @@ -137,7 +137,7 @@ object ActorRegistry extends ListenerManagement { actorsById.remove(actor.id,actor) // notify listeners - foreachListener(_ ! ActorUnregistered(actor)) + notifyListeners(ActorUnregistered(actor)) } /** diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index 9a7e44a197..2ebba03928 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -45,7 +45,7 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID} */ object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) + val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MAILBOX_CONFIG = MailboxConfig( capacity = Dispatchers.MAILBOX_CAPACITY, pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)), diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 19045e123b..5ace496947 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -83,20 +83,32 @@ class ExecutorBasedEventDrivenDispatcher( /** * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox */ - trait ExecutableMailbox { self: MessageQueue with Runnable => + trait ExecutableMailbox extends Runnable { self: MessageQueue => def run = { - try { - val reDispatch = processMailbox()//Returns true if we need to reschedule the processing - self.dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing - if (reDispatch) - dispatch(self) - } catch { - case e => - dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing - if(!self.isEmpty) //If the mailbox isn't empty, try to re-schedule processing, equivalent to reDispatch - dispatch(self) - throw e //Can't just swallow exceptions or errors - } + var lockAcquiredOnce = false + var finishedBeforeMailboxEmpty = false + // this do-while loop is required to prevent missing new messages between the end of the inner while + // loop and releasing the lock + do { + finishedBeforeMailboxEmpty = false //Reset this every run + if (dispatcherLock.tryLock()) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + lockAcquiredOnce = true + finishedBeforeMailboxEmpty = try { + processMailbox() + } catch { + case e => + dispatcherLock.unlock() + if (!self.isEmpty) + registerForExecution(self) + throw e + } + + dispatcherLock.unlock() + if (finishedBeforeMailboxEmpty) + registerForExecution(self) + } + } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !self.isEmpty)) } /** @@ -129,28 +141,25 @@ class ExecutorBasedEventDrivenDispatcher( def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation - dispatch(mbox) + registerForExecution(mbox) } /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with Runnable] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def createMailbox(actorRef: ActorRef): AnyRef = - if (mailboxCapacity > 0) new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with Runnable with ExecutableMailbox - else new DefaultUnboundedMessageQueue(blockDequeue = false) with Runnable with ExecutableMailbox + override def createMailbox(actorRef: ActorRef): AnyRef = { + if (mailboxCapacity > 0) + new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with ExecutableMailbox + else + new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox + } - def dispatch(mailbox: MessageQueue with Runnable): Unit = if (active) { - if (mailbox.dispatcherLock.tryLock()) {//Ensure that only one runnable can be in the executor pool at the same time - try { - executor execute mailbox - } catch { - case e: RejectedExecutionException => mailbox.dispatcherLock.unlock() - } - } + protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { + executor execute mailbox } else { log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) } @@ -167,8 +176,10 @@ class ExecutorBasedEventDrivenDispatcher( uuids.clear } - def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( + def ensureNotActive(): Unit = if (active) { + throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") + } override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]" diff --git a/akka-actor/src/main/scala/util/ListenerManagement.scala b/akka-actor/src/main/scala/util/ListenerManagement.scala index 0e17058380..7ad0f451f1 100644 --- a/akka-actor/src/main/scala/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/util/ListenerManagement.scala @@ -40,6 +40,23 @@ trait ListenerManagement extends Logging { if (manageLifeCycleOfListeners) listener.stop } + /* + * Returns whether there are any listeners currently + */ + def hasListeners: Boolean = !listeners.isEmpty + + protected def notifyListeners(message: => Any) { + if (hasListeners) { + val msg = message + val iterator = listeners.iterator + while (iterator.hasNext) { + val listener = iterator.next + if (listener.isRunning) listener ! msg + else log.warning("Can't notify [%s] since it is not running.", listener) + } + } + } + /** * Execute f with each listener as argument. */ diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index f61a5d63a1..264081259f 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -220,10 +220,10 @@ class RemoteClient private[akka] ( val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - foreachListener(_ ! RemoteClientError(connection.getCause, this)) + notifyListeners(RemoteClientError(connection.getCause, this)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } - foreachListener(_ ! RemoteClientStarted(this)) + notifyListeners(RemoteClientStarted(this)) isRunning = true } } @@ -232,7 +232,7 @@ class RemoteClient private[akka] ( log.info("Shutting down %s", name) if (isRunning) { isRunning = false - foreachListener(_ ! RemoteClientShutdown(this)) + notifyListeners(RemoteClientShutdown(this)) timer.stop timer = null openChannels.close.awaitUninterruptibly @@ -250,7 +250,7 @@ class RemoteClient private[akka] ( @deprecated("Use removeListener instead") def deregisterListener(actorRef: ActorRef) = removeListener(actorRef) - override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) + override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) protected override def manageLifeCycleOfListeners = false @@ -287,7 +287,7 @@ class RemoteClient private[akka] ( } else { val exception = new RemoteClientException( "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) - foreachListener(l => l ! RemoteClientError(exception, this)) + notifyListeners(RemoteClientError(exception, this)) throw exception } @@ -403,12 +403,12 @@ class RemoteClientHandler( futures.remove(reply.getId) } else { val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) - client.foreachListener(_ ! RemoteClientError(exception, client)) + client.notifyListeners(RemoteClientError(exception, client)) throw exception } } catch { case e: Exception => - client.foreachListener(_ ! RemoteClientError(e, client)) + client.notifyListeners(RemoteClientError(e, client)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -423,7 +423,7 @@ class RemoteClientHandler( client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { - client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client)) + client.notifyListeners(RemoteClientError(client.connection.getCause, client)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -433,7 +433,7 @@ class RemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { - client.foreachListener(_ ! RemoteClientConnected(client)) + client.notifyListeners(RemoteClientConnected(client)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) client.resetReconnectionTimeWindow } @@ -450,12 +450,12 @@ class RemoteClientHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.foreachListener(_ ! RemoteClientDisconnected(client)) + client.notifyListeners(RemoteClientDisconnected(client)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.foreachListener(_ ! RemoteClientError(event.getCause, client)) + client.notifyListeners(RemoteClientError(event.getCause, client)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index b10d8e5825..27b5a50bfc 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -245,12 +245,12 @@ class RemoteServer extends Logging with ListenerManagement { openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) _isRunning = true Cluster.registerLocalNode(hostname, port) - foreachListener(_ ! RemoteServerStarted(this)) + notifyListeners(RemoteServerStarted(this)) } } catch { case e => log.error(e, "Could not start up remote server") - foreachListener(_ ! RemoteServerError(e, this)) + notifyListeners(RemoteServerError(e, this)) } this } @@ -263,7 +263,7 @@ class RemoteServer extends Logging with ListenerManagement { openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) - foreachListener(_ ! RemoteServerShutdown(this)) + notifyListeners(RemoteServerShutdown(this)) } catch { case e: java.nio.channels.ClosedChannelException => {} case e => log.warning("Could not close remote server channel in a graceful way") @@ -323,7 +323,7 @@ class RemoteServer extends Logging with ListenerManagement { protected override def manageLifeCycleOfListeners = false - protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) + protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { RemoteServer.actorsFor(address).actors @@ -413,18 +413,18 @@ class RemoteServerHandler( def operationComplete(future: ChannelFuture): Unit = { if (future.isSuccess) { openChannels.add(future.getChannel) - server.foreachListener(_ ! RemoteServerClientConnected(server)) + server.notifyListeners(RemoteServerClientConnected(server)) } else future.getChannel.close } }) } else { - server.foreachListener(_ ! RemoteServerClientConnected(server)) + server.notifyListeners(RemoteServerClientConnected(server)) } } override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { log.debug("Remote client disconnected from [%s]", server.name) - server.foreachListener(_ ! RemoteServerClientDisconnected(server)) + server.notifyListeners(RemoteServerClientDisconnected(server)) } override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { @@ -446,7 +446,7 @@ class RemoteServerHandler( override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close - server.foreachListener(_ ! RemoteServerError(event.getCause, server)) + server.notifyListeners(RemoteServerError(event.getCause, server)) } private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { @@ -491,7 +491,7 @@ class RemoteServerHandler( } catch { case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) } } } @@ -523,10 +523,10 @@ class RemoteServerHandler( } catch { case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) case e: Throwable => channel.write(createErrorReplyMessage(e, request, false)) - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) } } @@ -559,7 +559,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote actor instance") - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) throw e } } else actorRefOrNull @@ -590,7 +590,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote typed actor instance") - server.foreachListener(_ ! RemoteServerError(e, server)) + server.notifyListeners(RemoteServerError(e, server)) throw e } } else typedActorOrNull diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index b27f5b4b4d..52bd0e6cb6 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -674,7 +674,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement { def register(proxy: AnyRef, init: AspectInit) = { val res = initializations.put(proxy, init) - foreachListener(_ ! AspectInitRegistered(proxy, init)) + notifyListeners(AspectInitRegistered(proxy, init)) res } @@ -683,7 +683,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement { */ def unregister(proxy: AnyRef): AspectInit = { val init = initializations.remove(proxy) - foreachListener(_ ! AspectInitUnregistered(proxy, init)) + notifyListeners(AspectInitUnregistered(proxy, init)) init.actorRef.stop init }