Moving shared remote classes into RemoteInterface

This commit is contained in:
Viktor Klang 2010-12-29 16:08:43 +01:00
parent c120589684
commit 236eecebcf
2 changed files with 73 additions and 72 deletions

View file

@ -11,6 +11,8 @@ import akka.util._
import akka.dispatch.CompletableFuture import akka.dispatch.CompletableFuture
import akka.config.Config.{config, TIME_UNIT} import akka.config.Config.{config, TIME_UNIT}
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import akka.AkkaException
import reflect.BeanProperty
trait RemoteModule extends Logging { trait RemoteModule extends Logging {
val UUID_PREFIX = "uuid:" val UUID_PREFIX = "uuid:"
@ -56,6 +58,57 @@ trait RemoteModule extends Logging {
} }
} }
/**
* Life-cycle events for RemoteClient.
*/
sealed trait RemoteClientLifeCycleEvent //TODO: REVISIT: Document change from RemoteClient to RemoteClientModule + remoteAddress
case class RemoteClientError(
@BeanProperty val cause: Throwable,
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientStarted(
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientShutdown(
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
/**
* Life-cycle events for RemoteServer.
*/
sealed trait RemoteServerLifeCycleEvent //TODO: REVISIT: Document change from RemoteServer to RemoteServerModule
case class RemoteServerStarted(
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerShutdown(
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerError(
@BeanProperty val cause: Throwable,
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerClientConnected(
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientDisconnected(
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientClosed(
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
/**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
*/
class RemoteClientException private[akka] (message: String,
@BeanProperty val client: RemoteClientModule,
val remoteAddress: InetSocketAddress) extends AkkaException(message)
/**
* Returned when a remote exception cannot be instantiated or parsed
*/
case class UnparsableException private[akka] (originalClassName: String,
originalMessage: String) extends AkkaException(originalMessage)
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
def shutdown { def shutdown {

View file

@ -11,7 +11,6 @@ import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization
import akka.japi.Creator import akka.japi.Creator
import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule}
import akka.config.Config._ import akka.config.Config._
import akka.serialization.RemoteActorSerialization._ import akka.serialization.RemoteActorSerialization._
import akka.AkkaException import akka.AkkaException
@ -37,32 +36,7 @@ import scala.reflect.BeanProperty
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import akka.actor. {ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.actor. {ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean} import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
import akka.remoteinterface._
/**
* Life-cycle events for RemoteClient.
*/
sealed trait RemoteClientLifeCycleEvent
case class RemoteClientError(
@BeanProperty val cause: Throwable,
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientStarted(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientShutdown(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
/**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
*/
class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
/**
* Returned when a remote exception cannot be instantiated or parsed
*/
case class UnparsableException private[akka] (originalClassName: String, originalMessage: String) extends AkkaException(originalMessage)
/** /**
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
@ -96,7 +70,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY
if (remoteClients.contains(hash)) remoteClients(hash) if (remoteClients.contains(hash)) remoteClients(hash)
else { else {
val client = new RemoteClient(hostname, port, loader, self.notifyListeners _) val client = new RemoteClient(this, new InetSocketAddress(hostname, port), loader, self.notifyListeners _)
client.connect client.connect
remoteClients += hash -> client remoteClients += hash -> client
client client
@ -165,18 +139,16 @@ object RemoteClient {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class RemoteClient private[akka] ( class RemoteClient private[akka] (
val hostname: String, val module: NettyRemoteClientModule,
val port: Int, val remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, val loader: Option[ClassLoader] = None,
val notifyListeners: (=> Any) => Unit) extends Logging { val notifyListeners: (=> Any) => Unit) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port val name = "RemoteClient@" + remoteAddress.getHostName + "::" + remoteAddress.getPort
//FIXME Should these be clear:ed on postStop? //FIXME Should these be clear:ed on postStop?
private val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] private val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[Uuid, ActorRef] private val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
private val remoteAddress = new InetSocketAddress(hostname, port)
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile @volatile
private var bootstrap: ClientBootstrap = _ private var bootstrap: ClientBootstrap = _
@ -205,7 +177,7 @@ class RemoteClient private[akka] (
bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true) bootstrap.setOption("keepAlive", true)
log.slf4j.info("Starting remote client connection to [{}:{}]", hostname, port) log.slf4j.info("Starting remote client connection to [{}]", remoteAddress)
// Wait until the connection attempt succeeds or fails. // Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress) connection = bootstrap.connect(remoteAddress)
@ -213,16 +185,16 @@ class RemoteClient private[akka] (
openChannels.add(channel) openChannels.add(channel)
if (!connection.isSuccess) { if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, this)) notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
log.slf4j.error("Remote client connection to [{}:{}] has failed", hostname, port) log.slf4j.error("Remote client connection to [{}] has failed", remoteAddress)
log.slf4j.debug("Remote client connection failed", connection.getCause) log.slf4j.debug("Remote client connection failed", connection.getCause)
} }
notifyListeners(RemoteClientStarted(this)) notifyListeners(RemoteClientStarted(module, remoteAddress))
} }
def shutdown = runSwitch switchOff { def shutdown = runSwitch switchOff {
log.slf4j.info("Shutting down {}", name) log.slf4j.info("Shutting down {}", name)
notifyListeners(RemoteClientShutdown(this)) notifyListeners(RemoteClientShutdown(module, remoteAddress))
timer.stop timer.stop
timer = null timer = null
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
@ -276,8 +248,8 @@ class RemoteClient private[akka] (
Some(futureResult) Some(futureResult)
} }
} else { } else {
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress)
notifyListeners(RemoteClientError(exception, this)) notifyListeners(RemoteClientError(exception, module, remoteAddress))
throw exception throw exception
} }
} }
@ -399,11 +371,11 @@ class RemoteClientHandler(
} }
case other => case other =>
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client) throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
} }
} catch { } catch {
case e: Exception => case e: Exception =>
client.notifyListeners(RemoteClientError(e, client)) client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
log.slf4j.error("Unexpected exception in remote client handler", e) log.slf4j.error("Unexpected exception in remote client handler", e)
throw e throw e
} }
@ -419,7 +391,7 @@ class RemoteClientHandler(
client.connection = bootstrap.connect(remoteAddress) client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) { if (!client.connection.isSuccess) {
client.notifyListeners(RemoteClientError(client.connection.getCause, client)) client.notifyListeners(RemoteClientError(client.connection.getCause, client.module, client.remoteAddress))
log.slf4j.error("Reconnection to [{}] has failed", remoteAddress) log.slf4j.error("Reconnection to [{}] has failed", remoteAddress)
log.slf4j.debug("Reconnection failed", client.connection.getCause) log.slf4j.debug("Reconnection failed", client.connection.getCause)
} }
@ -430,7 +402,7 @@ class RemoteClientHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
def connect = { def connect = {
client.notifyListeners(RemoteClientConnected(client)) client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress) log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress)
client.resetReconnectionTimeWindow client.resetReconnectionTimeWindow
} }
@ -440,19 +412,19 @@ class RemoteClientHandler(
sslHandler.handshake.addListener(new ChannelFutureListener { sslHandler.handshake.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = { def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) connect if (future.isSuccess) connect
else throw new RemoteClientException("Could not establish SSL handshake", client) else throw new RemoteClientException("Could not establish SSL handshake", client.module, client.remoteAddress)
} }
}) })
} else connect } else connect
} }
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientDisconnected(client)) client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress))
log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress) log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress)
} }
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.notifyListeners(RemoteClientError(event.getCause, client)) client.notifyListeners(RemoteClientError(event.getCause, client.module, client.remoteAddress))
if (event.getCause ne null) if (event.getCause ne null)
log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause) log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause)
else else
@ -530,27 +502,6 @@ object RemoteServer {
} }
} }
/**
* Life-cycle events for RemoteServer.
*/
sealed trait RemoteServerLifeCycleEvent //TODO: REVISIT: Document change from RemoteServer to RemoteServerModule
case class RemoteServerStarted(
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerShutdown(
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerError(
@BeanProperty val cause: Throwable,
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerClientConnected(
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientDisconnected(
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientClosed(
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
/** /**
* Provides the implementation of the Netty remote support * Provides the implementation of the Netty remote support
@ -1110,7 +1061,6 @@ class RemoteServerHandler(
val id = actorInfo.getId val id = actorInfo.getId
val sessionActorRefOrNull = findSessionActor(id, channel) val sessionActorRefOrNull = findSessionActor(id, channel)
if (sessionActorRefOrNull ne null) { if (sessionActorRefOrNull ne null) {
log.slf4j.debug("Found session actor with id {} for channel {} = {}",Array[AnyRef](id, channel, sessionActorRefOrNull))
sessionActorRefOrNull sessionActorRefOrNull
} else { } else {
// we dont have it in the session either, see if we have a factory for it // we dont have it in the session either, see if we have a factory for it
@ -1170,9 +1120,7 @@ class RemoteServerHandler(
if (actorRefOrNull ne null) if (actorRefOrNull ne null)
actorRefOrNull actorRefOrNull
else else { // the actor has not been registered globally. See if we have it in the session
{
// the actor has not been registered globally. See if we have it in the session
val sessionActorRefOrNull = createSessionActor(actorInfo, channel) val sessionActorRefOrNull = createSessionActor(actorInfo, channel)
if (sessionActorRefOrNull ne null) if (sessionActorRefOrNull ne null)
sessionActorRefOrNull sessionActorRefOrNull