diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index fefb1521ed..128205bf9a 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -7,7 +7,6 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef, IllegalActorStateException} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} -import se.scalablesolutions.akka.util.{UUID, Logging, Duration} import se.scalablesolutions.akka.config.Config._ import org.jboss.netty.channel._ @@ -27,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} import scala.reflect.BeanProperty +import se.scalablesolutions.akka.util.{ListenerManagement, UUID, Logging, Duration} /** * Atomic remote request/reply message id generator. @@ -173,13 +173,12 @@ object RemoteClient extends Logging { * * @author Jonas Bonér */ -class RemoteClient private[akka] (val hostname: String, val port: Int, val loader: Option[ClassLoader] = None) extends Logging { +class RemoteClient private[akka] (val hostname: String, val port: Int, val loader: Option[ClassLoader] = None) extends Logging with ListenerManagement { val name = "RemoteClient@" + hostname + "::" + port @volatile private[remote] var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]] private val supervisors = new ConcurrentHashMap[String, ActorRef] - private[remote] val listeners = new ConcurrentSkipListSet[ActorRef] private val channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool, @@ -204,7 +203,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause, hostname, port)) + foreachListener(l => l ! RemoteClientError(connection.getCause, hostname, port)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } isRunning = true @@ -221,9 +220,13 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade } } - def registerListener(actorRef: ActorRef) = listeners.add(actorRef) + @deprecated("Use addListener instead") + def registerListener(actorRef: ActorRef) = addListener(actorRef) - def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef) + @deprecated("Use removeListener instead") + def deregisterListener(actorRef: ActorRef) = removeListener(actorRef) + + override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) def send[T](request: RemoteRequestProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { if (request.getIsOneWay) { @@ -240,7 +243,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade } } else { val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") - listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port)) + foreachListener(l => l ! RemoteClientError(exception, hostname, port)) throw exception } @@ -338,12 +341,12 @@ class RemoteClientHandler( futures.remove(reply.getId) } else { val exception = new RemoteClientException("Unknown message received in remote client handler: " + result) - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port)) + client.foreachListener(l => l ! RemoteClientError(exception, client.hostname, client.port)) throw exception } } catch { case e: Exception => - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e, client.hostname, client.port)) + client.foreachListener(l => l ! RemoteClientError(e, client.hostname, client.port)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -357,7 +360,7 @@ class RemoteClientHandler( client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { - client.listeners.toArray.foreach(l => + client.foreachListener(l => l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } @@ -367,7 +370,7 @@ class RemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port)) + client.foreachListener(l => l ! RemoteClientConnected(client.hostname, client.port)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) } @@ -383,13 +386,12 @@ class RemoteClientHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(listener => - listener.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port)) + client.foreachListener(l => l ! RemoteClientDisconnected(client.hostname, client.port)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause, client.hostname, client.port)) + client.foreachListener(l => l ! RemoteClientError(event.getCause, client.hostname, client.port)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 942270f45f..131201d402 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -26,6 +26,7 @@ import org.jboss.netty.handler.ssl.SslHandler import scala.collection.mutable.Map +import reflect.BeanProperty /** * Use this object if you need a single remote server on a specific node. @@ -160,6 +161,28 @@ object RemoteServer { } } +/** + * Life-cycle events for RemoteServer. + */ +sealed trait RemoteServerLifeCycleEvent +case class RemoteServerError( + @BeanProperty val cause: Throwable, + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent +case class RemoteServerShutdown( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent +case class RemoteServerStarted( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent +/*FIXME NOT SUPPORTED YET + case class RemoteServerClientConnected( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent +case class RemoteServerClientDisconnected( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent*/ + /** * Use this class if you need a more than one remote server on a specific node. * @@ -176,11 +199,11 @@ object RemoteServer { * * @author Jonas Bonér */ -class RemoteServer extends Logging { +class RemoteServer extends Logging with ListenerManagement { val name = "RemoteServer@" + hostname + ":" + port - private var hostname = RemoteServer.HOSTNAME - private var port = RemoteServer.PORT + private[akka] var hostname = RemoteServer.HOSTNAME + private[akka] var port = RemoteServer.PORT @volatile private var _isRunning = false @@ -222,7 +245,7 @@ class RemoteServer extends Logging { RemoteServer.register(hostname, port, this) val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) val pipelineFactory = new RemoteServerPipelineFactory( - name, openChannels, loader, remoteActorSet.actors, remoteActorSet.typedActors) + name, openChannels, loader, remoteActorSet.actors, remoteActorSet.typedActors,this) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) @@ -231,9 +254,12 @@ class RemoteServer extends Logging { openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) _isRunning = true Cluster.registerLocalNode(hostname, port) + foreachListener(_ ! RemoteServerStarted(hostname,port)) } } catch { - case e => log.error(e, "Could not start up remote server") + case e => + log.error(e, "Could not start up remote server") + foreachListener(_ ! RemoteServerError(e,hostname,port)) } this } @@ -246,6 +272,7 @@ class RemoteServer extends Logging { openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) + foreachListener(_ ! RemoteServerShutdown(hostname,port)) } catch { case e: java.nio.channels.ClosedChannelException => {} case e => log.warning("Could not close remote server channel in a graceful way") @@ -302,6 +329,8 @@ class RemoteServer extends Logging { if (actorRef.registeredInRemoteNodeDuringSerialization) actors.remove(actorRef.uuid) } } + + protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) } object RemoteServerSslContext { @@ -327,7 +356,8 @@ class RemoteServerPipelineFactory( val openChannels: ChannelGroup, val loader: Option[ClassLoader], val actors: JMap[String, ActorRef], - val typedActors: JMap[String, AnyRef]) extends ChannelPipelineFactory { + val typedActors: JMap[String, AnyRef], + val server: RemoteServer) extends ChannelPipelineFactory { import RemoteServer._ def getPipeline: ChannelPipeline = { @@ -347,7 +377,7 @@ class RemoteServerPipelineFactory( case _ => (join(), join()) } - val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors) + val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors,server) val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer) new StaticChannelPipeline(stages: _*) } @@ -362,7 +392,8 @@ class RemoteServerHandler( val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], val actors: JMap[String, ActorRef], - val typedActors: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging { + val typedActors: JMap[String, AnyRef], + val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { val AW_PROXY_PREFIX = "$$ProxiedByAW".intern applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -406,9 +437,9 @@ class RemoteServerHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - event.getCause.printStackTrace log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close + server.foreachListener(_ ! RemoteServerError(event.getCause,server.hostname,server.port)) } private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { @@ -424,8 +455,7 @@ class RemoteServerHandler( val actorInfo = request.getActorInfo log.debug("Dispatching to remote actor [%s:%s]", actorInfo.getTarget, actorInfo.getUuid) - val actorRef = createActor(actorInfo) - actorRef.start + val actorRef = createActor(actorInfo).start val message = MessageSerializer.deserialize(request.getMessage) val sender = @@ -452,7 +482,9 @@ class RemoteServerHandler( channel.write(replyBuilder.build) } catch { - case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) + case e: Throwable => + channel.write(createErrorReplyMessage(e, request, true)) + server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) } } } @@ -482,8 +514,12 @@ class RemoteServerHandler( channel.write(replyBuilder.build) } } catch { - case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false)) - case e: Throwable => channel.write(createErrorReplyMessage(e, request, false)) + case e: InvocationTargetException => + channel.write(createErrorReplyMessage(e.getCause, request, false)) + server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) + case e: Throwable => + channel.write(createErrorReplyMessage(e, request, false)) + server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) } } @@ -515,6 +551,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote actor instance") + server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) throw e } } else actorRefOrNull @@ -522,7 +559,7 @@ class RemoteServerHandler( private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { val uuid = actorInfo.getUuid - val typedActorOrNull = typedActors.get(uuid) + val typedActorOrNull = typedActors get uuid if (typedActorOrNull eq null) { val typedActorInfo = actorInfo.getTypedActorInfo @@ -542,7 +579,10 @@ class RemoteServerHandler( typedActors.put(uuid, newInstance) newInstance } catch { - case e => log.error(e, "Could not create remote typed actor instance"); throw e + case e => + log.error(e, "Could not create remote typed actor instance") + server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) + throw e } } else typedActorOrNull } diff --git a/akka-core/src/main/scala/util/ListenerManagement.scala b/akka-core/src/main/scala/util/ListenerManagement.scala index 8b53cc7d0a..cfcb5ac2b6 100644 --- a/akka-core/src/main/scala/util/ListenerManagement.scala +++ b/akka-core/src/main/scala/util/ListenerManagement.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.util -import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.ConcurrentSkipListSet import se.scalablesolutions.akka.actor.ActorRef @@ -15,7 +15,7 @@ import se.scalablesolutions.akka.actor.ActorRef */ trait ListenerManagement extends Logging { - private val listeners = new CopyOnWriteArrayList[ActorRef] + private val listeners = new ConcurrentSkipListSet[ActorRef] /** * Adds the listener this this registry's listener list. @@ -23,7 +23,7 @@ trait ListenerManagement extends Logging { */ def addListener(listener: ActorRef) = { listener.start - listeners.add(listener) + listeners add listener } /** @@ -31,7 +31,7 @@ trait ListenerManagement extends Logging { * The listener is stopped by this method. */ def removeListener(listener: ActorRef) = { - listeners.remove(listener) + listeners remove listener listener.stop }