Safekeeping

This commit is contained in:
Viktor Klang 2010-09-12 11:24:27 +02:00
parent f20e0ee030
commit a5c5efc4ff
6 changed files with 46 additions and 29 deletions

View file

@ -125,7 +125,7 @@ object ActorRegistry extends ListenerManagement {
actorsByUUID.put(actor.uuid, actor)
// notify listeners
foreachListener(_ ! ActorRegistered(actor))
notifyListeners(ActorRegistered(actor))
}
/**
@ -137,7 +137,7 @@ object ActorRegistry extends ListenerManagement {
actorsById.remove(actor.id,actor)
// notify listeners
foreachListener(_ ! ActorUnregistered(actor))
notifyListeners(ActorUnregistered(actor))
}
/**

View file

@ -45,7 +45,7 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID}
*/
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_CONFIG = MailboxConfig(
capacity = Dispatchers.MAILBOX_CAPACITY,
pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)),

View file

@ -40,6 +40,23 @@ trait ListenerManagement extends Logging {
if (manageLifeCycleOfListeners) listener.stop
}
/*
* Returns whether there are any listeners currently
*/
def hasListeners: Boolean = !listeners.isEmpty
protected def notifyListeners(message: => Any) {
if (hasListeners) {
val msg = message
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) listener ! msg
else log.warning("Can't notify [%s] since it is not running.", listener)
}
}
}
/**
* Execute <code>f</code> with each listener as argument.
*/

View file

@ -220,10 +220,10 @@ class RemoteClient private[akka] (
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
foreachListener(_ ! RemoteClientError(connection.getCause, this))
notifyListeners(RemoteClientError(connection.getCause, this))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
}
foreachListener(_ ! RemoteClientStarted(this))
notifyListeners(RemoteClientStarted(this))
isRunning = true
}
}
@ -232,7 +232,7 @@ class RemoteClient private[akka] (
log.info("Shutting down %s", name)
if (isRunning) {
isRunning = false
foreachListener(_ ! RemoteClientShutdown(this))
notifyListeners(RemoteClientShutdown(this))
timer.stop
timer = null
openChannels.close.awaitUninterruptibly
@ -250,7 +250,7 @@ class RemoteClient private[akka] (
@deprecated("Use removeListener instead")
def deregisterListener(actorRef: ActorRef) = removeListener(actorRef)
override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
protected override def manageLifeCycleOfListeners = false
@ -287,7 +287,7 @@ class RemoteClient private[akka] (
} else {
val exception = new RemoteClientException(
"Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
foreachListener(l => l ! RemoteClientError(exception, this))
notifyListeners(RemoteClientError(exception, this))
throw exception
}
@ -403,12 +403,12 @@ class RemoteClientHandler(
futures.remove(reply.getId)
} else {
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
client.foreachListener(_ ! RemoteClientError(exception, client))
client.notifyListeners(RemoteClientError(exception, client))
throw exception
}
} catch {
case e: Exception =>
client.foreachListener(_ ! RemoteClientError(e, client))
client.notifyListeners(RemoteClientError(e, client))
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
@ -423,7 +423,7 @@ class RemoteClientHandler(
client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client))
client.notifyListeners(RemoteClientError(client.connection.getCause, client))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
@ -433,7 +433,7 @@ class RemoteClientHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
def connect = {
client.foreachListener(_ ! RemoteClientConnected(client))
client.notifyListeners(RemoteClientConnected(client))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
client.resetReconnectionTimeWindow
}
@ -450,12 +450,12 @@ class RemoteClientHandler(
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.foreachListener(_ ! RemoteClientDisconnected(client))
client.notifyListeners(RemoteClientDisconnected(client))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.foreachListener(_ ! RemoteClientError(event.getCause, client))
client.notifyListeners(RemoteClientError(event.getCause, client))
log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close
}

View file

@ -245,12 +245,12 @@ class RemoteServer extends Logging with ListenerManagement {
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
_isRunning = true
Cluster.registerLocalNode(hostname, port)
foreachListener(_ ! RemoteServerStarted(this))
notifyListeners(RemoteServerStarted(this))
}
} catch {
case e =>
log.error(e, "Could not start up remote server")
foreachListener(_ ! RemoteServerError(e, this))
notifyListeners(RemoteServerError(e, this))
}
this
}
@ -263,7 +263,7 @@ class RemoteServer extends Logging with ListenerManagement {
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port)
foreachListener(_ ! RemoteServerShutdown(this))
notifyListeners(RemoteServerShutdown(this))
} catch {
case e: java.nio.channels.ClosedChannelException => {}
case e => log.warning("Could not close remote server channel in a graceful way")
@ -323,7 +323,7 @@ class RemoteServer extends Logging with ListenerManagement {
protected override def manageLifeCycleOfListeners = false
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = {
RemoteServer.actorsFor(address).actors
@ -413,18 +413,18 @@ class RemoteServerHandler(
def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) {
openChannels.add(future.getChannel)
server.foreachListener(_ ! RemoteServerClientConnected(server))
server.notifyListeners(RemoteServerClientConnected(server))
} else future.getChannel.close
}
})
} else {
server.foreachListener(_ ! RemoteServerClientConnected(server))
server.notifyListeners(RemoteServerClientConnected(server))
}
}
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
log.debug("Remote client disconnected from [%s]", server.name)
server.foreachListener(_ ! RemoteServerClientDisconnected(server))
server.notifyListeners(RemoteServerClientDisconnected(server))
}
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
@ -446,7 +446,7 @@ class RemoteServerHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
log.error(event.getCause, "Unexpected exception from remote downstream")
event.getChannel.close
server.foreachListener(_ ! RemoteServerError(event.getCause, server))
server.notifyListeners(RemoteServerError(event.getCause, server))
}
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
@ -491,7 +491,7 @@ class RemoteServerHandler(
} catch {
case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, true))
server.foreachListener(_ ! RemoteServerError(e, server))
server.notifyListeners(RemoteServerError(e, server))
}
}
}
@ -523,10 +523,10 @@ class RemoteServerHandler(
} catch {
case e: InvocationTargetException =>
channel.write(createErrorReplyMessage(e.getCause, request, false))
server.foreachListener(_ ! RemoteServerError(e, server))
server.notifyListeners(RemoteServerError(e, server))
case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, false))
server.foreachListener(_ ! RemoteServerError(e, server))
server.notifyListeners(RemoteServerError(e, server))
}
}
@ -559,7 +559,7 @@ class RemoteServerHandler(
} catch {
case e =>
log.error(e, "Could not create remote actor instance")
server.foreachListener(_ ! RemoteServerError(e, server))
server.notifyListeners(RemoteServerError(e, server))
throw e
}
} else actorRefOrNull
@ -590,7 +590,7 @@ class RemoteServerHandler(
} catch {
case e =>
log.error(e, "Could not create remote typed actor instance")
server.foreachListener(_ ! RemoteServerError(e, server))
server.notifyListeners(RemoteServerError(e, server))
throw e
}
} else typedActorOrNull

View file

@ -674,7 +674,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
def register(proxy: AnyRef, init: AspectInit) = {
val res = initializations.put(proxy, init)
foreachListener(_ ! AspectInitRegistered(proxy, init))
notifyListeners(AspectInitRegistered(proxy, init))
res
}
@ -683,7 +683,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
*/
def unregister(proxy: AnyRef): AspectInit = {
val init = initializations.remove(proxy)
foreachListener(_ ! AspectInitUnregistered(proxy, init))
notifyListeners(AspectInitUnregistered(proxy, init))
init.actorRef.stop
init
}