Reverted to using LocalActorRefs for client-managed actors to get supervision working, more migrated tests

This commit is contained in:
Viktor Klang 2010-12-20 16:58:05 +01:00
parent dc15562ce1
commit 6edfb7d5b8
13 changed files with 421 additions and 588 deletions

View file

@ -11,8 +11,6 @@ import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization
import akka.japi.Creator
import akka.actor.{newUuid,ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule}
import akka.config.Config._
import akka.serialization.RemoteActorSerialization._
@ -34,13 +32,11 @@ import org.jboss.netty.handler.ssl.SslHandler
import java.net.{ SocketAddress, InetSocketAddress }
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import scala.collection.mutable.{ HashSet, HashMap }
import scala.reflect.BeanProperty
import java.lang.reflect.InvocationTargetException
import akka.actor. {ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
/**
* Life-cycle events for RemoteClient.
@ -78,7 +74,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private val remoteActors = new HashMap[Address, HashSet[Uuid]]
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(Some(serviceId), implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor))
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor))
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
@ -117,12 +113,12 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
remoteClients -= hash
}
}
//TODO: REVISIT IMPLEMENT OR REMOVE
/*private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
clientFor().registerSupervisorForActor(actorRef)
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
clientFor(actorRef.homeAddress, None).registerSupervisorForActor(actorRef)
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef =
clientFor().deregisterSupervisorForActor(actorRef)*/
clientFor(actorRef.homeAddress, None).deregisterSupervisorForActor(actorRef)
/**
* Clean-up all open connections.
@ -265,7 +261,7 @@ class RemoteClient private[akka] (
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
log.slf4j.debug("sending message: {} is running {} has future {}", Array[AnyRef](request, isRunning.asInstanceOf[AnyRef], senderFuture))
log.slf4j.debug("sending message: {} has future {}", request, senderFuture)
if (isRunning) {
if (request.getOneWay) {
connection.getChannel.write(request)
@ -280,8 +276,7 @@ class RemoteClient private[akka] (
Some(futureResult)
}
} else {
val exception = new RemoteClientException(
"Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
notifyListeners(RemoteClientError(exception, this))
throw exception
}
@ -580,32 +575,26 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
// case _ =>
// RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader)
//}
RemoteActorRef(Some(serviceId), className, hostname, port, timeout, loader)
RemoteActorRef(serviceId, className, hostname, port, timeout, loader)
}
def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = {
val Host = this.hostname
val Port = this.port
(host,port) match {
case (Host, Port) if optimizeLocalScoped_? =>
ActorRegistry.actorOf(clazz) //Local
case _ =>
new RemoteActorRef(None,clazz.getName,host,port,timeout,None)
}
import ReflectiveAccess.{ createInstance, noParams, noArgs }
val ref = new LocalActorRef(() => createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
new InetSocketAddress(host, port))
ref.timeout = timeout
ref
}
}
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
import RemoteServer._
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
@volatile private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
def hostname = address.hostname
def port = address.port
def name = "RemoteServer@" + hostname + ":" + port
private val _isRunning = new Switch(false)
val name = "NettyRemoteServer@" + host + ":" + port
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool,Executors.newCachedThreadPool)
@ -614,23 +603,57 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, serverModule)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(host, port)))
serverModule.notifyListeners(RemoteServerStarted(serverModule))
def shutdown {
try {
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: java.nio.channels.ClosedChannelException => {}
case e => serverModule.log.slf4j.warn("Could not close remote server channel in a graceful way")
}
}
}
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
import RemoteServer._
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def hostname = currentServer.get match {
case Some(s) => s.host
case None => ReflectiveAccess.Remote.HOSTNAME
}
def port = currentServer.get match {
case Some(s) => s.port
case None => ReflectiveAccess.Remote.PORT
}
def name = currentServer.get match {
case Some(s) => s.name
case None => "NettyRemoteServer@" + ReflectiveAccess.Remote.HOSTNAME + ":" + ReflectiveAccess.Remote.PORT
}
private val _isRunning = new Switch(false)
def isRunning = _isRunning.isOn
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
try {
_isRunning switchOn {
address = Address(_hostname,_port)
log.slf4j.info("Starting remote server at [{}:{}]", hostname, port)
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, this)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
notifyListeners(RemoteServerStarted(this))
log.slf4j.debug("Starting up remote server on {}:{}",_hostname, _port)
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
case e =>
@ -642,14 +665,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
def shutdownServerModule = guard withGuard {
_isRunning switchOff {
try {
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
notifyListeners(RemoteServerShutdown(this))
} catch {
case e: java.nio.channels.ClosedChannelException => {}
case e => log.slf4j.warn("Could not close remote server channel in a graceful way")
currentServer.getAndSet(None) foreach {
instance =>
log.slf4j.debug("Shutting down remote server on {}:{}",instance.host, instance.port)
instance.shutdown
}
}
}
@ -1136,7 +1155,7 @@ class RemoteServerHandler(
val id = actorInfo.getId
val sessionActorRefOrNull = findSessionActor(id, channel)
if (sessionActorRefOrNull ne null) {
log.debug("found session actor with id {} for channel {}",id, channel)
log.slf4j.debug("Found session actor with id {} for channel {} = {}",Array[AnyRef](id, channel, sessionActorRefOrNull))
sessionActorRefOrNull
} else {
// we dont have it in the session either, see if we have a factory for it