Improving latency in EBEDD

This commit is contained in:
Viktor Klang 2010-09-12 11:24:53 +02:00
commit 49f6b382d6
7 changed files with 84 additions and 56 deletions

View file

@ -125,7 +125,7 @@ object ActorRegistry extends ListenerManagement {
actorsByUUID.put(actor.uuid, actor) actorsByUUID.put(actor.uuid, actor)
// notify listeners // notify listeners
foreachListener(_ ! ActorRegistered(actor)) notifyListeners(ActorRegistered(actor))
} }
/** /**
@ -137,7 +137,7 @@ object ActorRegistry extends ListenerManagement {
actorsById.remove(actor.id,actor) actorsById.remove(actor.id,actor)
// notify listeners // notify listeners
foreachListener(_ ! ActorUnregistered(actor)) notifyListeners(ActorUnregistered(actor))
} }
/** /**

View file

@ -45,7 +45,7 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID}
*/ */
object Dispatchers extends Logging { object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5) val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000) val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_CONFIG = MailboxConfig( val MAILBOX_CONFIG = MailboxConfig(
capacity = Dispatchers.MAILBOX_CAPACITY, capacity = Dispatchers.MAILBOX_CAPACITY,
pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)), pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)),

View file

@ -83,20 +83,32 @@ class ExecutorBasedEventDrivenDispatcher(
/** /**
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox
*/ */
trait ExecutableMailbox { self: MessageQueue with Runnable => trait ExecutableMailbox extends Runnable { self: MessageQueue =>
def run = { def run = {
try { var lockAcquiredOnce = false
val reDispatch = processMailbox()//Returns true if we need to reschedule the processing var finishedBeforeMailboxEmpty = false
self.dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing // this do-while loop is required to prevent missing new messages between the end of the inner while
if (reDispatch) // loop and releasing the lock
dispatch(self) do {
} catch { finishedBeforeMailboxEmpty = false //Reset this every run
case e => if (dispatcherLock.tryLock()) {
dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing // Only dispatch if we got the lock. Otherwise another thread is already dispatching.
if(!self.isEmpty) //If the mailbox isn't empty, try to re-schedule processing, equivalent to reDispatch lockAcquiredOnce = true
dispatch(self) finishedBeforeMailboxEmpty = try {
throw e //Can't just swallow exceptions or errors processMailbox()
} } catch {
case e =>
dispatcherLock.unlock()
if (!self.isEmpty)
registerForExecution(self)
throw e
}
dispatcherLock.unlock()
if (finishedBeforeMailboxEmpty)
registerForExecution(self)
}
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !self.isEmpty))
} }
/** /**
@ -129,28 +141,25 @@ class ExecutorBasedEventDrivenDispatcher(
def dispatch(invocation: MessageInvocation) = { def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver) val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation mbox enqueue invocation
dispatch(mbox) registerForExecution(mbox)
} }
/** /**
* @return the mailbox associated with the actor * @return the mailbox associated with the actor
*/ */
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with Runnable] private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def createMailbox(actorRef: ActorRef): AnyRef = override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity > 0) new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with Runnable with ExecutableMailbox if (mailboxCapacity > 0)
else new DefaultUnboundedMessageQueue(blockDequeue = false) with Runnable with ExecutableMailbox new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with ExecutableMailbox
else
new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox
}
def dispatch(mailbox: MessageQueue with Runnable): Unit = if (active) { protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) {
if (mailbox.dispatcherLock.tryLock()) {//Ensure that only one runnable can be in the executor pool at the same time executor execute mailbox
try {
executor execute mailbox
} catch {
case e: RejectedExecutionException => mailbox.dispatcherLock.unlock()
}
}
} else { } else {
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
} }
@ -167,8 +176,10 @@ class ExecutorBasedEventDrivenDispatcher(
uuids.clear uuids.clear
} }
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( def ensureNotActive(): Unit = if (active) {
throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running") "Can't build a new thread pool for a dispatcher that is already up and running")
}
override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]" override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]"

View file

@ -40,6 +40,23 @@ trait ListenerManagement extends Logging {
if (manageLifeCycleOfListeners) listener.stop if (manageLifeCycleOfListeners) listener.stop
} }
/*
* Returns whether there are any listeners currently
*/
def hasListeners: Boolean = !listeners.isEmpty
protected def notifyListeners(message: => Any) {
if (hasListeners) {
val msg = message
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) listener ! msg
else log.warning("Can't notify [%s] since it is not running.", listener)
}
}
}
/** /**
* Execute <code>f</code> with each listener as argument. * Execute <code>f</code> with each listener as argument.
*/ */

View file

@ -220,10 +220,10 @@ class RemoteClient private[akka] (
val channel = connection.awaitUninterruptibly.getChannel val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel) openChannels.add(channel)
if (!connection.isSuccess) { if (!connection.isSuccess) {
foreachListener(_ ! RemoteClientError(connection.getCause, this)) notifyListeners(RemoteClientError(connection.getCause, this))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
} }
foreachListener(_ ! RemoteClientStarted(this)) notifyListeners(RemoteClientStarted(this))
isRunning = true isRunning = true
} }
} }
@ -232,7 +232,7 @@ class RemoteClient private[akka] (
log.info("Shutting down %s", name) log.info("Shutting down %s", name)
if (isRunning) { if (isRunning) {
isRunning = false isRunning = false
foreachListener(_ ! RemoteClientShutdown(this)) notifyListeners(RemoteClientShutdown(this))
timer.stop timer.stop
timer = null timer = null
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
@ -250,7 +250,7 @@ class RemoteClient private[akka] (
@deprecated("Use removeListener instead") @deprecated("Use removeListener instead")
def deregisterListener(actorRef: ActorRef) = removeListener(actorRef) def deregisterListener(actorRef: ActorRef) = removeListener(actorRef)
override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
protected override def manageLifeCycleOfListeners = false protected override def manageLifeCycleOfListeners = false
@ -287,7 +287,7 @@ class RemoteClient private[akka] (
} else { } else {
val exception = new RemoteClientException( val exception = new RemoteClientException(
"Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
foreachListener(l => l ! RemoteClientError(exception, this)) notifyListeners(RemoteClientError(exception, this))
throw exception throw exception
} }
@ -403,12 +403,12 @@ class RemoteClientHandler(
futures.remove(reply.getId) futures.remove(reply.getId)
} else { } else {
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
client.foreachListener(_ ! RemoteClientError(exception, client)) client.notifyListeners(RemoteClientError(exception, client))
throw exception throw exception
} }
} catch { } catch {
case e: Exception => case e: Exception =>
client.foreachListener(_ ! RemoteClientError(e, client)) client.notifyListeners(RemoteClientError(e, client))
log.error("Unexpected exception in remote client handler: %s", e) log.error("Unexpected exception in remote client handler: %s", e)
throw e throw e
} }
@ -423,7 +423,7 @@ class RemoteClientHandler(
client.connection = bootstrap.connect(remoteAddress) client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) { if (!client.connection.isSuccess) {
client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client)) client.notifyListeners(RemoteClientError(client.connection.getCause, client))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
} }
} }
@ -433,7 +433,7 @@ class RemoteClientHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
def connect = { def connect = {
client.foreachListener(_ ! RemoteClientConnected(client)) client.notifyListeners(RemoteClientConnected(client))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
client.resetReconnectionTimeWindow client.resetReconnectionTimeWindow
} }
@ -450,12 +450,12 @@ class RemoteClientHandler(
} }
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.foreachListener(_ ! RemoteClientDisconnected(client)) client.notifyListeners(RemoteClientDisconnected(client))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
} }
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.foreachListener(_ ! RemoteClientError(event.getCause, client)) client.notifyListeners(RemoteClientError(event.getCause, client))
log.error(event.getCause, "Unexpected exception from downstream in remote client") log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close event.getChannel.close
} }

View file

@ -245,12 +245,12 @@ class RemoteServer extends Logging with ListenerManagement {
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
_isRunning = true _isRunning = true
Cluster.registerLocalNode(hostname, port) Cluster.registerLocalNode(hostname, port)
foreachListener(_ ! RemoteServerStarted(this)) notifyListeners(RemoteServerStarted(this))
} }
} catch { } catch {
case e => case e =>
log.error(e, "Could not start up remote server") log.error(e, "Could not start up remote server")
foreachListener(_ ! RemoteServerError(e, this)) notifyListeners(RemoteServerError(e, this))
} }
this this
} }
@ -263,7 +263,7 @@ class RemoteServer extends Logging with ListenerManagement {
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port) Cluster.deregisterLocalNode(hostname, port)
foreachListener(_ ! RemoteServerShutdown(this)) notifyListeners(RemoteServerShutdown(this))
} catch { } catch {
case e: java.nio.channels.ClosedChannelException => {} case e: java.nio.channels.ClosedChannelException => {}
case e => log.warning("Could not close remote server channel in a graceful way") case e => log.warning("Could not close remote server channel in a graceful way")
@ -323,7 +323,7 @@ class RemoteServer extends Logging with ListenerManagement {
protected override def manageLifeCycleOfListeners = false protected override def manageLifeCycleOfListeners = false
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = { private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = {
RemoteServer.actorsFor(address).actors RemoteServer.actorsFor(address).actors
@ -413,18 +413,18 @@ class RemoteServerHandler(
def operationComplete(future: ChannelFuture): Unit = { def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) { if (future.isSuccess) {
openChannels.add(future.getChannel) openChannels.add(future.getChannel)
server.foreachListener(_ ! RemoteServerClientConnected(server)) server.notifyListeners(RemoteServerClientConnected(server))
} else future.getChannel.close } else future.getChannel.close
} }
}) })
} else { } else {
server.foreachListener(_ ! RemoteServerClientConnected(server)) server.notifyListeners(RemoteServerClientConnected(server))
} }
} }
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
log.debug("Remote client disconnected from [%s]", server.name) log.debug("Remote client disconnected from [%s]", server.name)
server.foreachListener(_ ! RemoteServerClientDisconnected(server)) server.notifyListeners(RemoteServerClientDisconnected(server))
} }
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
@ -446,7 +446,7 @@ class RemoteServerHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
log.error(event.getCause, "Unexpected exception from remote downstream") log.error(event.getCause, "Unexpected exception from remote downstream")
event.getChannel.close event.getChannel.close
server.foreachListener(_ ! RemoteServerError(event.getCause, server)) server.notifyListeners(RemoteServerError(event.getCause, server))
} }
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
@ -491,7 +491,7 @@ class RemoteServerHandler(
} catch { } catch {
case e: Throwable => case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, true)) channel.write(createErrorReplyMessage(e, request, true))
server.foreachListener(_ ! RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
} }
} }
} }
@ -523,10 +523,10 @@ class RemoteServerHandler(
} catch { } catch {
case e: InvocationTargetException => case e: InvocationTargetException =>
channel.write(createErrorReplyMessage(e.getCause, request, false)) channel.write(createErrorReplyMessage(e.getCause, request, false))
server.foreachListener(_ ! RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
case e: Throwable => case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, false)) channel.write(createErrorReplyMessage(e, request, false))
server.foreachListener(_ ! RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
} }
} }
@ -559,7 +559,7 @@ class RemoteServerHandler(
} catch { } catch {
case e => case e =>
log.error(e, "Could not create remote actor instance") log.error(e, "Could not create remote actor instance")
server.foreachListener(_ ! RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
throw e throw e
} }
} else actorRefOrNull } else actorRefOrNull
@ -590,7 +590,7 @@ class RemoteServerHandler(
} catch { } catch {
case e => case e =>
log.error(e, "Could not create remote typed actor instance") log.error(e, "Could not create remote typed actor instance")
server.foreachListener(_ ! RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
throw e throw e
} }
} else typedActorOrNull } else typedActorOrNull

View file

@ -674,7 +674,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
def register(proxy: AnyRef, init: AspectInit) = { def register(proxy: AnyRef, init: AspectInit) = {
val res = initializations.put(proxy, init) val res = initializations.put(proxy, init)
foreachListener(_ ! AspectInitRegistered(proxy, init)) notifyListeners(AspectInitRegistered(proxy, init))
res res
} }
@ -683,7 +683,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
*/ */
def unregister(proxy: AnyRef): AspectInit = { def unregister(proxy: AnyRef): AspectInit = {
val init = initializations.remove(proxy) val init = initializations.remove(proxy)
foreachListener(_ ! AspectInitUnregistered(proxy, init)) notifyListeners(AspectInitUnregistered(proxy, init))
init.actorRef.stop init.actorRef.stop
init init
} }