merged with upstream
This commit is contained in:
commit
cbca588248
8 changed files with 915 additions and 1549 deletions
|
|
@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
|
|||
import java.util.{Map => JMap}
|
||||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.{Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage}
|
||||
import akka.actor.{Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage, ActorType => AkkaActorType}
|
||||
import akka.util._
|
||||
import akka.remote.protocol.RemoteProtocol._
|
||||
import akka.remote.protocol.RemoteProtocol.ActorType._
|
||||
|
|
@ -406,7 +406,7 @@ class RemoteServerPipelineFactory(
|
|||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
|
|
@ -481,10 +481,10 @@ class RemoteServerHandler(
|
|||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
|
||||
val message = event.getMessage
|
||||
if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
|
||||
if (message.isInstanceOf[RemoteRequestProtocol]) {
|
||||
val requestProtocol = message.asInstanceOf[RemoteRequestProtocol]
|
||||
if (message.isInstanceOf[RemoteMessageProtocol]) {
|
||||
val requestProtocol = message.asInstanceOf[RemoteMessageProtocol]
|
||||
if (RemoteServer.REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
|
||||
handleRemoteRequestProtocol(requestProtocol, event.getChannel)
|
||||
handleRemoteMessageProtocol(requestProtocol, event.getChannel)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -500,8 +500,8 @@ class RemoteServerHandler(
|
|||
else None
|
||||
}
|
||||
|
||||
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
|
||||
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
|
||||
log.debug("Received RemoteMessageProtocol[\n%s]", request.toString)
|
||||
request.getActorInfo.getActorType match {
|
||||
case SCALA_ACTOR => dispatchToActor(request, channel)
|
||||
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
|
||||
|
|
@ -510,7 +510,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
}
|
||||
|
||||
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel): Unit = {
|
||||
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
|
||||
val actorInfo = request.getActorInfo
|
||||
log.debug("Dispatching to remote actor [%s:%s]", actorInfo.getTarget, actorInfo.getUuid)
|
||||
|
||||
|
|
@ -519,7 +519,7 @@ class RemoteServerHandler(
|
|||
createActor(actorInfo).start
|
||||
} catch {
|
||||
case e: SecurityException =>
|
||||
channel.write(createErrorReplyMessage(e, request, true))
|
||||
channel.write(createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
return
|
||||
}
|
||||
|
|
@ -537,7 +537,7 @@ class RemoteServerHandler(
|
|||
throw new SecurityException("Remote server is operating is untrusted mode, can not pass on a LifeCycleMessage to the remote actor")
|
||||
|
||||
case _ => // then match on user defined messages
|
||||
if (request.getIsOneWay) actorRef.!(message)(sender)
|
||||
if (request.getOneWay) actorRef.!(message)(sender)
|
||||
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(
|
||||
message,
|
||||
request.getActorInfo.getTimeout,
|
||||
|
|
@ -545,16 +545,24 @@ class RemoteServerHandler(
|
|||
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
|
||||
override def onComplete(result: AnyRef) {
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setUuid(request.getUuid)
|
||||
.setMessage(MessageSerializer.serialize(result))
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(true)
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef),
|
||||
Right(request.getUuid),
|
||||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
Left(result),
|
||||
true,
|
||||
Some(actorRef),
|
||||
None,
|
||||
AkkaActorType.ScalaActor,
|
||||
None)
|
||||
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
||||
try {
|
||||
channel.write(replyBuilder.build)
|
||||
channel.write(messageBuilder.build)
|
||||
} catch {
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
|
|
@ -562,7 +570,7 @@ class RemoteServerHandler(
|
|||
|
||||
override def onCompleteException(exception: Throwable) {
|
||||
try {
|
||||
channel.write(createErrorReplyMessage(exception, request, true))
|
||||
channel.write(createErrorReplyMessage(exception, request, AkkaActorType.ScalaActor))
|
||||
} catch {
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
|
|
@ -572,7 +580,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
}
|
||||
|
||||
private def dispatchToTypedActor(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = {
|
||||
val actorInfo = request.getActorInfo
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
|
||||
|
|
@ -583,24 +591,32 @@ class RemoteServerHandler(
|
|||
|
||||
try {
|
||||
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
|
||||
if (request.getIsOneWay) messageReceiver.invoke(typedActor, args: _*)
|
||||
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*)
|
||||
else {
|
||||
val result = messageReceiver.invoke(typedActor, args: _*)
|
||||
log.debug("Returning result from remote typed actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setUuid(request.getUuid)
|
||||
.setMessage(MessageSerializer.serialize(result))
|
||||
.setIsSuccessful(true)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(replyBuilder.build)
|
||||
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
None,
|
||||
Right(request.getUuid),
|
||||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
Left(result),
|
||||
true,
|
||||
None,
|
||||
None,
|
||||
AkkaActorType.TypedActor,
|
||||
None)
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(messageBuilder.build)
|
||||
}
|
||||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
channel.write(createErrorReplyMessage(e.getCause, request, false))
|
||||
channel.write(createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
case e: Throwable =>
|
||||
channel.write(createErrorReplyMessage(e, request, false))
|
||||
channel.write(createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
|
|
@ -710,19 +726,26 @@ class RemoteServerHandler(
|
|||
} else typedActorOrNull
|
||||
}
|
||||
|
||||
private def createErrorReplyMessage(e: Throwable, request: RemoteRequestProtocol, isActor: Boolean): RemoteReplyProtocol = {
|
||||
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = {
|
||||
val actorInfo = request.getActorInfo
|
||||
log.error(e, "Could not invoke remote typed actor [%s :: %s]", actorInfo.getTypedActorInfo.getMethod, actorInfo.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setUuid(request.getUuid)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(isActor)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
replyBuilder.build
|
||||
log.error(exception, "Could not invoke remote actor [%s]", actorInfo.getTarget)
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
None,
|
||||
Right(request.getUuid),
|
||||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
Right(exception),
|
||||
true,
|
||||
None,
|
||||
None,
|
||||
actorType,
|
||||
None)
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
messageBuilder.build
|
||||
}
|
||||
|
||||
private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = {
|
||||
private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = {
|
||||
val attachment = ctx.getAttachment
|
||||
if ((attachment ne null) &&
|
||||
attachment.isInstanceOf[String] &&
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue