Removing the old typed actors

This commit is contained in:
Viktor Klang 2011-05-19 14:29:21 +02:00
parent 8741454e89
commit 236d8e07e9
62 changed files with 76 additions and 4545 deletions

View file

@ -7,7 +7,6 @@ package akka.remote.netty
import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture, Future }
import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings }
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
import akka.remoteinterface._
@ -17,15 +16,13 @@ import akka.actor.{
LocalActorRef,
Actor,
RemoteActorRef,
TypedActor,
ActorRef,
IllegalActorStateException,
RemoteActorSystemMessage,
uuidFrom,
Uuid,
Exit,
LifeCycleMessage,
ActorType AkkaActorType
LifeCycleMessage
}
import akka.actor.Actor._
import akka.config.Config._
@ -74,9 +71,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private val remoteActors = new Index[Address, Uuid]
private val lock = new ReadWriteGuard
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, timeout, loader, AkkaActorType.TypedActor))
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]],
@ -84,10 +78,8 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType,
loader: Option[ClassLoader]): Option[CompletableFuture[T]] =
withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType))
withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef))
private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient T): T = {
@ -204,9 +196,7 @@ abstract class RemoteClient private[akka] (
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
actorRef: ActorRef): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
@ -216,8 +206,6 @@ abstract class RemoteClient private[akka] (
Right(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
if (isAuthenticated.compareAndSet(false, true)) RemoteClientSettings.SECURE_COOKIE else None).build, senderFuture)
}
@ -649,25 +637,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
}
}
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
*/
def registerTypedActor(id: String, typedActor: AnyRef): Unit = guard withGuard {
if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid)
else registerTypedActor(id, typedActor, typedActors)
}
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
*/
def registerTypedPerSessionActor(id: String, factory: AnyRef): Unit = guard withGuard {
registerTypedPerSessionActor(id, () factory, typedActorsFactories)
}
/**
* Register RemoteModule Actor by a specific 'id' passed as argument.
* <p/>
@ -703,16 +672,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
registry.put(id, factory) //TODO change to putIfAbsent
}
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
if (_isRunning.isOn)
registry.put(id, typedActor) //TODO change to putIfAbsent
}
private def registerTypedPerSessionActor[Key](id: Key, factory: () AnyRef, registry: ConcurrentHashMap[Key, () AnyRef]) {
if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
/**
* Unregister RemoteModule Actor that is registered using its 'id' field (not custom ID).
*/
@ -749,26 +708,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
actorsFactories.remove(id)
}
}
/**
* Unregister RemoteModule Typed Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedActor(id: String): Unit = guard withGuard {
if (_isRunning.isOn) {
if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length))
else typedActors.remove(id)
}
}
/**
* Unregister RemoteModule Typed Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedPerSessionActor(id: String): Unit =
if (_isRunning.isOn) typedActorsFactories.remove(id)
}
/**
@ -818,7 +757,6 @@ class RemoteServerHandler(
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]()
val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]()
//Writes the specified message to the specified channel and propagates write errors to listeners
private def write(channel: Channel, payload: AkkaRemoteProtocol): Unit = {
@ -847,7 +785,6 @@ class RemoteServerHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]())
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
}
@ -863,15 +800,6 @@ class RemoteServerHandler(
try { actor ! PoisonPill } catch { case e: Exception }
}
//FIXME switch approach or use other thread to execute this
// stop all typed session actors
for (
map Option(typedSessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)
) {
try { TypedActor.stop(actor) } catch { case e: Exception }
}
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
}
@ -901,14 +829,8 @@ class RemoteServerHandler(
case _ None
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
request.getActorInfo.getActorType match {
case SCALA_ACTOR dispatchToActor(request, channel)
case TYPED_ACTOR dispatchToTypedActor(request, channel)
case JAVA_ACTOR throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
case other throw new IllegalActorStateException("Unknown ActorType [" + other + "]")
}
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) =
dispatchToActor(request, channel)
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
val actorInfo = request.getActorInfo
@ -917,7 +839,7 @@ class RemoteServerHandler(
try { createActor(actorInfo, channel) } catch {
case e: SecurityException
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
write(channel, createErrorReplyMessage(e, request))
server.notifyListeners(RemoteServerError(e, server))
return
}
@ -943,7 +865,7 @@ class RemoteServerHandler(
None,
Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout).
onComplete(_.value.get match {
case l: Left[Throwable, Any] write(channel, createErrorReplyMessage(l.a, request, AkkaActorType.ScalaActor))
case l: Left[Throwable, Any] write(channel, createErrorReplyMessage(l.a, request))
case r: Right[Throwable, Any]
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
@ -953,8 +875,6 @@ class RemoteServerHandler(
r,
true,
Some(actorRef),
None,
AkkaActorType.ScalaActor,
None)
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
@ -965,116 +885,12 @@ class RemoteServerHandler(
}
}
private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = {
val actorInfo = request.getActorInfo
val typedActorInfo = actorInfo.getTypedActorInfo
/* TODO Implement sender references for remote TypedActor calls
if (request.hasSender) {
val iface = //TODO extrace the senderProxy interface from the request, load it as a class using the application loader
val ref = RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)
val senderTA = TypedActor.createProxyForRemoteActorRef[AnyRef](iface, ref)
Some()
} else None
*/
val typedActor = createTypedActor(actorInfo, channel)
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
val (ownerTypeHint, argClasses, args) =
MessageSerializer
.deserialize(request.getMessage)
.asInstanceOf[Tuple3[String, Array[Class[_]], Array[AnyRef]]]
def resolveMethod(bottomType: Class[_],
typeHint: String,
methodName: String,
methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
var typeToResolve = bottomType
var targetMethod: java.lang.reflect.Method = null
var firstException: NoSuchMethodException = null
while ((typeToResolve ne null) && (targetMethod eq null)) {
if ((typeHint eq null) || typeToResolve.getName.startsWith(typeHint)) {
try {
targetMethod = typeToResolve.getDeclaredMethod(methodName, methodSignature: _*)
targetMethod.setAccessible(true)
} catch {
case e: NoSuchMethodException
if (firstException eq null)
firstException = e
}
}
typeToResolve = typeToResolve.getSuperclass
}
if ((targetMethod eq null) && (firstException ne null))
throw firstException
targetMethod
}
try {
val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses)
//TODO SenderContextInfo.senderActorRef.value = sender
//TODO SenderContextInfo.senderProxy.value = senderProxy
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread
else {
//Sends the response
def sendResponse(result: Either[Throwable, Any]): Unit = try {
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
actorInfo.getAddress,
actorInfo.getTimeout,
result,
true,
None,
None,
AkkaActorType.TypedActor,
None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
write(channel, RemoteEncoder.encode(messageBuilder.build))
} catch {
case e: Exception
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
}
messageReceiver.invoke(typedActor, args: _*) match { //TODO execute in non-IO thread
//If it's a future, we can lift on that to defer the send to when the future is completed
case f: Future[_] f.onComplete(future sendResponse(future.value.get))
case other sendResponse(Right(other))
}
}
} catch {
case e: Exception
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e match {
case e: InvocationTargetException e.getCause
case e e
}, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
}
finally {
//TODO SenderContextInfo.senderActorRef.value = None ?
//TODO SenderContextInfo.senderProxy.value = None ?
}
}
private def findSessionActor(id: String, channel: Channel): ActorRef =
sessionActors.get(channel) match {
case null null
case map map get id
}
private def findTypedSessionActor(id: String, channel: Channel): AnyRef =
typedSessionActors.get(channel) match {
case null null
case map map get id
}
/**
* gets the actor from the session, or creates one if there is a factory for it
*/
@ -1114,34 +930,7 @@ class RemoteServerHandler(
}
}
/**
* gets the actor from the session, or creates one if there is a factory for it
*/
private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = {
val address = actorInfo.getAddress
findTypedSessionActor(address, channel) match {
case null
server.findTypedActorFactory(address) match {
case null null
case factory
val newInstance = factory()
typedSessionActors.get(channel).put(address, newInstance)
newInstance
}
case sessionActor sessionActor
}
}
private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = {
val uuid = actorInfo.getUuid
server.findTypedActorByAddressOrUuid(actorInfo.getAddress, parseUuid(uuid).toString) match {
// the actor has not been registered globally. See if we have it in the session
case null createTypedSessionActor(actorInfo, channel)
case typedActor typedActor
}
}
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): AkkaRemoteProtocol = {
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {
val actorInfo = request.getActorInfo
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
@ -1151,8 +940,6 @@ class RemoteServerHandler(
Left(exception),
true,
None,
None,
actorType,
None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
RemoteEncoder.encode(messageBuilder.build)