Adding TODOs for solving the problem with sender references and senderproxies for remote TypedActor calls
This commit is contained in:
parent
b41ecfe09e
commit
87069f6e02
2 changed files with 21 additions and 11 deletions
|
|
@ -842,7 +842,6 @@ class RemoteServerHandler(
|
||||||
}
|
}
|
||||||
|
|
||||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||||
import scala.collection.JavaConversions.asScalaIterable
|
|
||||||
val clientAddress = getClientAddress(ctx)
|
val clientAddress = getClientAddress(ctx)
|
||||||
|
|
||||||
// stop all session actors
|
// stop all session actors
|
||||||
|
|
@ -956,6 +955,14 @@ class RemoteServerHandler(
|
||||||
private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = {
|
private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = {
|
||||||
val actorInfo = request.getActorInfo
|
val actorInfo = request.getActorInfo
|
||||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||||
|
/* TODO Implement sender references for remote TypedActor calls
|
||||||
|
if (request.hasSender) {
|
||||||
|
val iface = //TODO extrace the senderProxy interface from the request, load it as a class using the application loader
|
||||||
|
val ref = RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)
|
||||||
|
val senderTA = TypedActor.createProxyForRemoteActorRef[AnyRef](iface, ref)
|
||||||
|
Some()
|
||||||
|
} else None
|
||||||
|
*/
|
||||||
|
|
||||||
val typedActor = createTypedActor(actorInfo, channel)
|
val typedActor = createTypedActor(actorInfo, channel)
|
||||||
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
|
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
|
||||||
|
|
@ -996,7 +1003,8 @@ class RemoteServerHandler(
|
||||||
|
|
||||||
try {
|
try {
|
||||||
val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses)
|
val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses)
|
||||||
|
//TODO SenderContextInfo.senderActorRef.value = sender
|
||||||
|
//TODO SenderContextInfo.senderProxy.value = senderProxy
|
||||||
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread
|
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread
|
||||||
else {
|
else {
|
||||||
//Sends the response
|
//Sends the response
|
||||||
|
|
@ -1022,21 +1030,23 @@ class RemoteServerHandler(
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
server.notifyListeners(RemoteServerError(e, server))
|
||||||
}
|
}
|
||||||
|
|
||||||
messageReceiver.invoke(typedActor, args: _*) match { //FIXME execute in non-IO thread
|
messageReceiver.invoke(typedActor, args: _*) match { //TODO execute in non-IO thread
|
||||||
//If it's a future, we can lift on that to defer the send to when the future is completed
|
//If it's a future, we can lift on that to defer the send to when the future is completed
|
||||||
case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) )
|
case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) )
|
||||||
case other => sendResponse(Right(other))
|
case other => sendResponse(Right(other))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: InvocationTargetException =>
|
|
||||||
EventHandler.error(e, this, e.getMessage)
|
|
||||||
write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
|
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
|
||||||
case e: Exception =>
|
case e: Exception =>
|
||||||
EventHandler.error(e, this, e.getMessage)
|
EventHandler.error(e, this, e.getMessage)
|
||||||
write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
|
write(channel, createErrorReplyMessage(e match {
|
||||||
|
case e: InvocationTargetException => e.getCause
|
||||||
|
case e => e
|
||||||
|
}, request, AkkaActorType.TypedActor))
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
server.notifyListeners(RemoteServerError(e, server))
|
||||||
|
} finally {
|
||||||
|
//TODO SenderContextInfo.senderActorRef.value = None ?
|
||||||
|
//TODO SenderContextInfo.senderProxy.value = None ?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -846,19 +846,19 @@ private[akka] abstract class ActorAspect {
|
||||||
case -1 => s
|
case -1 => s
|
||||||
case x => s.substring(0,x + TypedActor.AW_PROXY_PREFIX.length)
|
case x => s.substring(0,x + TypedActor.AW_PROXY_PREFIX.length)
|
||||||
}
|
}
|
||||||
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
|
//TODO: Add ownerTypeHint and parameter types to the TypedActorInfo?
|
||||||
val message: Tuple3[String, Array[Class[_]], Array[AnyRef]] =
|
val message: Tuple3[String, Array[Class[_]], Array[AnyRef]] =
|
||||||
((extractOwnerTypeHint(methodRtti.getMethod.getDeclaringClass.getName),
|
((extractOwnerTypeHint(methodRtti.getMethod.getDeclaringClass.getName),
|
||||||
methodRtti.getParameterTypes,
|
methodRtti.getParameterTypes,
|
||||||
methodRtti.getParameterValues))
|
methodRtti.getParameterValues))
|
||||||
|
|
||||||
//FIXME send the interface name of the senderProxy in the TypedActorContext and assemble a context.sender with that interface on the server
|
//TODO send the interface name of the senderProxy in the TypedActorContext and assemble a context.sender with that interface on the server
|
||||||
//val senderProxy = Option(SenderContextInfo.senderProxy.value)
|
//val senderProxy = Option(SenderContextInfo.senderProxy.value)
|
||||||
|
|
||||||
val future = Actor.remote.send[AnyRef](
|
val future = Actor.remote.send[AnyRef](
|
||||||
message, senderActorRef, None, remoteAddress.get,
|
message, senderActorRef, None, remoteAddress.get,
|
||||||
timeout, isOneWay, actorRef,
|
timeout, isOneWay, actorRef,
|
||||||
Some((interfaceClass.getName, methodRtti.getMethod.getName)),
|
Some((interfaceClass.getName, methodRtti.getMethod.getName)), //TODO Include the interface of the senderProxy here somehow
|
||||||
ActorType.TypedActor,
|
ActorType.TypedActor,
|
||||||
None) //TODO: REVISIT: Use another classloader?
|
None) //TODO: REVISIT: Use another classloader?
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue