Merged with Viktors work with removing client managed actors. Also removed actor.id, added actor.address

This commit is contained in:
Jonas Bonér 2011-04-08 21:16:05 +02:00
commit 3374eef6ce
35 changed files with 214 additions and 1490 deletions

View file

@ -63,7 +63,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private val lock = new ReadWriteGuard
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor))
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, implClassName, timeout, loader, AkkaActorType.TypedActor))
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
@ -119,16 +119,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
}
}
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
withClientFor(actorRef.homeAddress.get, None)(_.registerSupervisorForActor(actorRef))
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard {
remoteClients.get(Address(actorRef.homeAddress.get)) match {
case s: Some[RemoteClient] => s.get.deregisterSupervisorForActor(actorRef)
case None => actorRef
}
}
/**
* Clean-up all open connections.
*/
@ -142,15 +132,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
remoteClients.foreach({ case (addr, client) => client.shutdown })
remoteClients.clear
}
def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = {
remoteActors.put(Address(hostname, port), uuid)
}
private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid) = {
remoteActors.remove(Address(hostname,port), uuid)
//TODO: should the connection be closed when the last actor deregisters?
}
}
/**
@ -170,7 +151,6 @@ abstract class RemoteClient private[akka] (
remoteAddress.getPort
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
protected val pendingRequests = {
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
@ -219,7 +199,7 @@ abstract class RemoteClient private[akka] (
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
Left(actorRef.uuid),
actorRef.id,
actorRef.address,
actorRef.actorClassName,
actorRef.timeout,
Right(message),
@ -320,16 +300,6 @@ abstract class RemoteClient private[akka] (
pendingRequest = pendingRequests.peek // try to grab next message
}
}
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't register supervisor for " + actorRef + " since it is not under supervision")
else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef)
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't unregister supervisor for " + actorRef + " since it is not under supervision")
else supervisors.remove(actorRef.supervisor.get.uuid)
}
/**
@ -358,7 +328,7 @@ class ActiveRemoteClient private[akka] (
timer = new HashedWheelTimer
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, bootstrap, remoteAddress, timer, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -433,7 +403,6 @@ class ActiveRemoteClient private[akka] (
class ActiveRemoteClientPipelineFactory(
name: String,
futures: ConcurrentMap[Uuid, CompletableFuture[_]],
supervisors: ConcurrentMap[Uuid, ActorRef],
bootstrap: ClientBootstrap,
remoteAddress: InetSocketAddress,
timer: HashedWheelTimer,
@ -450,7 +419,7 @@ class ActiveRemoteClientPipelineFactory(
case _ => (Nil,Nil)
}
val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client)
val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil
new StaticChannelPipeline(stages: _*)
}
@ -463,7 +432,6 @@ class ActiveRemoteClientPipelineFactory(
class ActiveRemoteClientHandler(
val name: String,
val futures: ConcurrentMap[Uuid, CompletableFuture[_]],
val supervisors: ConcurrentMap[Uuid, ActorRef],
val bootstrap: ClientBootstrap,
val remoteAddress: InetSocketAddress,
val timer: HashedWheelTimer,
@ -488,19 +456,7 @@ class ActiveRemoteClientHandler(
val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message)
} else {
val exception = parseException(reply, client.loader)
if (reply.hasSupervisorUuid()) {
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
"Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, exception)
}
future.completeWithException(exception)
future.completeWithException(parseException(reply, client.loader))
}
case other =>
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
@ -584,25 +540,12 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
if (optimizeLocalScoped_?) {
val home = this.address
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort) {//TODO: switch to InetSocketAddress.equals?
val localRef = findActorByIdOrUuid(serviceId,serviceId)
val localRef = findActorByAddressOrUuid(serviceId,serviceId)
if (localRef ne null) return localRef //Code significantly simpler with the return statement
}
}
RemoteActorRef(serviceId, className, host, port, timeout, loader)
}
def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = {
if (optimizeLocalScoped_?) {
val home = this.address
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
return new LocalActorRef(factory, None, false, "todo") // Code is much simpler with return
}
val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), true, "todo")
//ref.timeout = timeout //removed because setting default timeout should be done after construction
ref
RemoteActorRef(serviceId, className, timeout, loader)
}
}
@ -762,7 +705,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
*/
def unregister(actorRef: ActorRef): Unit = guard withGuard {
if (_isRunning.isOn) {
actors.remove(actorRef.id, actorRef)
actors.remove(actorRef.address, actorRef)
actorsByUuid.remove(actorRef.uuid, actorRef)
}
}
@ -899,19 +842,18 @@ class RemoteServerHandler(
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
import scala.collection.JavaConversions.asScalaIterable
val clientAddress = getClientAddress(ctx)
// stop all session actors
for (map <- Option(sessionActors.remove(event.getChannel));
actor <- asScalaIterable(map.values)) {
actor <- collectionAsScalaIterable(map.values)) {
try { actor ! PoisonPill } catch { case e: Exception => }
}
//FIXME switch approach or use other thread to execute this
// stop all typed session actors
for (map <- Option(typedSessionActors.remove(event.getChannel));
actor <- asScalaIterable(map.values)) {
actor <- collectionAsScalaIterable(map.values)) {
try { TypedActor.stop(actor) } catch { case e: Exception => }
}
@ -990,7 +932,7 @@ class RemoteServerHandler(
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getId,
actorInfo.getAddress,
actorInfo.getTarget,
actorInfo.getTimeout,
r,
@ -1013,6 +955,14 @@ class RemoteServerHandler(
private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = {
val actorInfo = request.getActorInfo
val typedActorInfo = actorInfo.getTypedActorInfo
/* TODO Implement sender references for remote TypedActor calls
if (request.hasSender) {
val iface = //TODO extrace the senderProxy interface from the request, load it as a class using the application loader
val ref = RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)
val senderTA = TypedActor.createProxyForRemoteActorRef[AnyRef](iface, ref)
Some()
} else None
*/
val typedActor = createTypedActor(actorInfo, channel)
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
@ -1053,7 +1003,8 @@ class RemoteServerHandler(
try {
val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses)
//TODO SenderContextInfo.senderActorRef.value = sender
//TODO SenderContextInfo.senderProxy.value = senderProxy
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread
else {
//Sends the response
@ -1061,7 +1012,7 @@ class RemoteServerHandler(
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
actorInfo.getId,
actorInfo.getAddress,
actorInfo.getTarget,
actorInfo.getTimeout,
result,
@ -1079,21 +1030,23 @@ class RemoteServerHandler(
server.notifyListeners(RemoteServerError(e, server))
}
messageReceiver.invoke(typedActor, args: _*) match { //FIXME execute in non-IO thread
messageReceiver.invoke(typedActor, args: _*) match { //TODO execute in non-IO thread
//If it's a future, we can lift on that to defer the send to when the future is completed
case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) )
case other => sendResponse(Right(other))
}
}
} catch {
case e: InvocationTargetException =>
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
write(channel, createErrorReplyMessage(e match {
case e: InvocationTargetException => e.getCause
case e => e
}, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
} finally {
//TODO SenderContextInfo.senderActorRef.value = None ?
//TODO SenderContextInfo.senderProxy.value = None ?
}
}
@ -1114,50 +1067,22 @@ class RemoteServerHandler(
*/
private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
val uuid = actorInfo.getUuid
val id = actorInfo.getId
val address = actorInfo.getAddress
findSessionActor(id, channel) match {
findSessionActor(address, channel) match {
case null => // we dont have it in the session either, see if we have a factory for it
server.findActorFactory(id) match {
server.findActorFactory(address) match {
case null => null
case factory =>
val actorRef = factory()
actorRef.uuid = parseUuid(uuid) //FIXME is this sensible?
sessionActors.get(channel).put(id, actorRef)
sessionActors.get(channel).put(address, actorRef)
actorRef.start //Start it where's it's created
}
case sessionActor => sessionActor
}
}
private def createClientManagedActor(actorInfo: ActorInfoProtocol): ActorRef = {
val uuid = actorInfo.getUuid
val id = actorInfo.getId
val timeout = actorInfo.getTimeout
val name = actorInfo.getTarget
try {
if (UNTRUSTED_MODE) throw new SecurityException(
"RemoteModule server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
actorRef.uuid = parseUuid(uuid)
actorRef.id = id
actorRef.timeout = timeout
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
actorRef.start //Start it where it's created
} catch {
case e: Throwable =>
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
throw e
}
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
@ -1167,14 +1092,11 @@ class RemoteServerHandler(
*/
private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
val uuid = actorInfo.getUuid
val id = actorInfo.getId
val address = actorInfo.getAddress
server.findActorByIdOrUuid(id, parseUuid(uuid).toString) match {
case null => // the actor has not been registered globally. See if we have it in the session
createSessionActor(actorInfo, channel) match {
case null => createClientManagedActor(actorInfo) // maybe it is a client managed actor
case sessionActor => sessionActor
}
server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match {
// the actor has not been registered globally. See if we have it in the session
case null => createSessionActor(actorInfo, channel)
case actorRef => actorRef
}
}
@ -1183,63 +1105,25 @@ class RemoteServerHandler(
* gets the actor from the session, or creates one if there is a factory for it
*/
private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={
val id = actorInfo.getId
findTypedSessionActor(id, channel) match {
val address = actorInfo.getAddress
findTypedSessionActor(address, channel) match {
case null =>
server.findTypedActorFactory(id) match {
server.findTypedActorFactory(address) match {
case null => null
case factory =>
val newInstance = factory()
typedSessionActors.get(channel).put(id, newInstance)
typedSessionActors.get(channel).put(address, newInstance)
newInstance
}
case sessionActor => sessionActor
}
}
private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = {
val typedActorInfo = actorInfo.getTypedActorInfo
val interfaceClassname = typedActorInfo.getInterface
val targetClassname = actorInfo.getTarget
val uuid = actorInfo.getUuid
try {
if (UNTRUSTED_MODE) throw new SecurityException(
"RemoteModule server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
val (interfaceClass, targetClass) =
if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname),
applicationLoader.get.loadClass(targetClassname))
else (Class.forName(interfaceClassname), Class.forName(targetClassname))
val newInstance = TypedActor.newInstance(
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
server.typedActors.put(parseUuid(uuid).toString, newInstance) // register by uuid
newInstance
} catch {
case e: Throwable =>
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
throw e
}
}
private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = {
val uuid = actorInfo.getUuid
server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match {
case null => // the actor has not been registered globally. See if we have it in the session
createTypedSessionActor(actorInfo, channel) match {
case null =>
// FIXME this is broken, if a user tries to get a server-managed typed actor and that is not registered then a client-managed typed actor is created, but just throwing an exception here causes client-managed typed actors to fail
/* val e = new RemoteServerException("Can't load remote Typed Actor for [" + actorInfo.getId + "]")
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
throw e
*/ createClientManagedTypedActor(actorInfo) // client-managed actor
case sessionActor => sessionActor
}
server.findTypedActorByAddressOrUuid(actorInfo.getAddress, parseUuid(uuid).toString) match {
// the actor has not been registered globally. See if we have it in the session
case null => createTypedSessionActor(actorInfo, channel)
case typedActor => typedActor
}
}
@ -1249,7 +1133,7 @@ class RemoteServerHandler(
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
actorInfo.getId,
actorInfo.getAddress,
actorInfo.getTarget,
actorInfo.getTimeout,
Left(exception),
@ -1284,7 +1168,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
protected val open = new AtomicBoolean(true)
override def add(channel: Channel): Boolean = guard withReadGuard {
if(open.get) {
if (open.get) {
super.add(channel)
} else {
channel.close