diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 94b60cbfc1..7764469b62 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -11,6 +11,8 @@ import akka.util._ import akka.dispatch.CompletableFuture import akka.config.Config.{config, TIME_UNIT} import java.util.concurrent.ConcurrentHashMap +import akka.AkkaException +import reflect.BeanProperty trait RemoteModule extends Logging { val UUID_PREFIX = "uuid:" @@ -56,6 +58,57 @@ trait RemoteModule extends Logging { } } +/** + * Life-cycle events for RemoteClient. + */ +sealed trait RemoteClientLifeCycleEvent //TODO: REVISIT: Document change from RemoteClient to RemoteClientModule + remoteAddress +case class RemoteClientError( + @BeanProperty val cause: Throwable, + @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected( + @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected( + @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientStarted( + @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientShutdown( + @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + + +/** + * Life-cycle events for RemoteServer. + */ +sealed trait RemoteServerLifeCycleEvent //TODO: REVISIT: Document change from RemoteServer to RemoteServerModule +case class RemoteServerStarted( + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent +case class RemoteServerShutdown( + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent +case class RemoteServerError( + @BeanProperty val cause: Throwable, + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent +case class RemoteServerClientConnected( + @BeanProperty val server: RemoteServerModule, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientDisconnected( + @BeanProperty val server: RemoteServerModule, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientClosed( + @BeanProperty val server: RemoteServerModule, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent + +/** + * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. + */ +class RemoteClientException private[akka] (message: String, + @BeanProperty val client: RemoteClientModule, + val remoteAddress: InetSocketAddress) extends AkkaException(message) + +/** + * Returned when a remote exception cannot be instantiated or parsed + */ +case class UnparsableException private[akka] (originalClassName: String, + originalMessage: String) extends AkkaException(originalMessage) + abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { def shutdown { diff --git a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala index b9e8e511de..cd81c337c0 100644 --- a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala @@ -11,7 +11,6 @@ import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.config.ConfigurationException import akka.serialization.RemoteActorSerialization import akka.japi.Creator -import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule} import akka.config.Config._ import akka.serialization.RemoteActorSerialization._ import akka.AkkaException @@ -37,32 +36,7 @@ import scala.reflect.BeanProperty import java.lang.reflect.InvocationTargetException import akka.actor. {ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean} - -/** - * Life-cycle events for RemoteClient. - */ -sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError( - @BeanProperty val cause: Throwable, - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent -case class RemoteClientDisconnected( - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent -case class RemoteClientConnected( - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent -case class RemoteClientStarted( - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent -case class RemoteClientShutdown( - @BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent - -/** - * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. - */ -class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message) - -/** - * Returned when a remote exception cannot be instantiated or parsed - */ -case class UnparsableException private[akka] (originalClassName: String, originalMessage: String) extends AkkaException(originalMessage) +import akka.remoteinterface._ /** * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. @@ -96,7 +70,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY if (remoteClients.contains(hash)) remoteClients(hash) else { - val client = new RemoteClient(hostname, port, loader, self.notifyListeners _) + val client = new RemoteClient(this, new InetSocketAddress(hostname, port), loader, self.notifyListeners _) client.connect remoteClients += hash -> client client @@ -165,18 +139,16 @@ object RemoteClient { * @author Jonas Bonér */ class RemoteClient private[akka] ( - val hostname: String, - val port: Int, + val module: NettyRemoteClientModule, + val remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, val notifyListeners: (=> Any) => Unit) extends Logging { - val name = "RemoteClient@" + hostname + "::" + port + val name = "RemoteClient@" + remoteAddress.getHostName + "::" + remoteAddress.getPort //FIXME Should these be clear:ed on postStop? private val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] private val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - private val remoteAddress = new InetSocketAddress(hostname, port) - //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private var bootstrap: ClientBootstrap = _ @@ -205,7 +177,7 @@ class RemoteClient private[akka] ( bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) - log.slf4j.info("Starting remote client connection to [{}:{}]", hostname, port) + log.slf4j.info("Starting remote client connection to [{}]", remoteAddress) // Wait until the connection attempt succeeds or fails. connection = bootstrap.connect(remoteAddress) @@ -213,16 +185,16 @@ class RemoteClient private[akka] ( openChannels.add(channel) if (!connection.isSuccess) { - notifyListeners(RemoteClientError(connection.getCause, this)) - log.slf4j.error("Remote client connection to [{}:{}] has failed", hostname, port) + notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) + log.slf4j.error("Remote client connection to [{}] has failed", remoteAddress) log.slf4j.debug("Remote client connection failed", connection.getCause) } - notifyListeners(RemoteClientStarted(this)) + notifyListeners(RemoteClientStarted(module, remoteAddress)) } def shutdown = runSwitch switchOff { log.slf4j.info("Shutting down {}", name) - notifyListeners(RemoteClientShutdown(this)) + notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop timer = null openChannels.close.awaitUninterruptibly @@ -276,8 +248,8 @@ class RemoteClient private[akka] ( Some(futureResult) } } else { - val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) - notifyListeners(RemoteClientError(exception, this)) + val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress) + notifyListeners(RemoteClientError(exception, module, remoteAddress)) throw exception } } @@ -399,11 +371,11 @@ class RemoteClientHandler( } case other => - throw new RemoteClientException("Unknown message received in remote client handler: " + other, client) + throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress) } } catch { case e: Exception => - client.notifyListeners(RemoteClientError(e, client)) + client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) log.slf4j.error("Unexpected exception in remote client handler", e) throw e } @@ -419,7 +391,7 @@ class RemoteClientHandler( client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { - client.notifyListeners(RemoteClientError(client.connection.getCause, client)) + client.notifyListeners(RemoteClientError(client.connection.getCause, client.module, client.remoteAddress)) log.slf4j.error("Reconnection to [{}] has failed", remoteAddress) log.slf4j.debug("Reconnection failed", client.connection.getCause) } @@ -430,7 +402,7 @@ class RemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { - client.notifyListeners(RemoteClientConnected(client)) + client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress) client.resetReconnectionTimeWindow } @@ -440,19 +412,19 @@ class RemoteClientHandler( sslHandler.handshake.addListener(new ChannelFutureListener { def operationComplete(future: ChannelFuture): Unit = { if (future.isSuccess) connect - else throw new RemoteClientException("Could not establish SSL handshake", client) + else throw new RemoteClientException("Could not establish SSL handshake", client.module, client.remoteAddress) } }) } else connect } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.notifyListeners(RemoteClientDisconnected(client)) + client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress)) log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.notifyListeners(RemoteClientError(event.getCause, client)) + client.notifyListeners(RemoteClientError(event.getCause, client.module, client.remoteAddress)) if (event.getCause ne null) log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause) else @@ -530,27 +502,6 @@ object RemoteServer { } } -/** - * Life-cycle events for RemoteServer. - */ -sealed trait RemoteServerLifeCycleEvent //TODO: REVISIT: Document change from RemoteServer to RemoteServerModule -case class RemoteServerStarted( - @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent -case class RemoteServerShutdown( - @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent -case class RemoteServerError( - @BeanProperty val cause: Throwable, - @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent -case class RemoteServerClientConnected( - @BeanProperty val server: RemoteServerModule, - @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent -case class RemoteServerClientDisconnected( - @BeanProperty val server: RemoteServerModule, - @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent -case class RemoteServerClientClosed( - @BeanProperty val server: RemoteServerModule, - @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent - /** * Provides the implementation of the Netty remote support @@ -1110,7 +1061,6 @@ class RemoteServerHandler( val id = actorInfo.getId val sessionActorRefOrNull = findSessionActor(id, channel) if (sessionActorRefOrNull ne null) { - log.slf4j.debug("Found session actor with id {} for channel {} = {}",Array[AnyRef](id, channel, sessionActorRefOrNull)) sessionActorRefOrNull } else { // we dont have it in the session either, see if we have a factory for it @@ -1170,9 +1120,7 @@ class RemoteServerHandler( if (actorRefOrNull ne null) actorRefOrNull - else - { - // the actor has not been registered globally. See if we have it in the session + else { // the actor has not been registered globally. See if we have it in the session val sessionActorRefOrNull = createSessionActor(actorInfo, channel) if (sessionActorRefOrNull ne null) sessionActorRefOrNull