Giving all remote messages their own uuid, reusing actorInfo.uuid for futures, closing ticket 580
This commit is contained in:
parent
091bb41930
commit
cb2054f103
2 changed files with 31 additions and 27 deletions
|
|
@ -56,7 +56,7 @@ case class RemoteClientShutdown(
|
|||
*/
|
||||
class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
|
||||
|
||||
case class UnparsableException private[akka] (val originalClassName: String, val originalMessage: String) extends AkkaException(originalMessage)
|
||||
case class UnparsableException private[akka] (originalClassName: String, originalMessage: String) extends AkkaException(originalMessage)
|
||||
|
||||
/**
|
||||
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
|
||||
|
|
@ -416,17 +416,18 @@ class RemoteClientHandler(
|
|||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
|
||||
try {
|
||||
val result = event.getMessage
|
||||
if (result.isInstanceOf[RemoteMessageProtocol]) {
|
||||
val reply = result.asInstanceOf[RemoteMessageProtocol]
|
||||
val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow)
|
||||
event.getMessage match {
|
||||
case reply: RemoteMessageProtocol =>
|
||||
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
|
||||
log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
|
||||
val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]]
|
||||
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
|
||||
|
||||
if (reply.hasMessage) {
|
||||
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
|
||||
val message = MessageSerializer.deserialize(reply.getMessage)
|
||||
future.completeWithResult(message)
|
||||
} else {
|
||||
val exception = parseException(reply, client.loader)
|
||||
if (reply.hasSupervisorUuid()) {
|
||||
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
|
||||
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
|
||||
|
|
@ -434,14 +435,14 @@ class RemoteClientHandler(
|
|||
val supervisedActor = supervisors.get(supervisorUuid)
|
||||
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
|
||||
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
||||
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
|
||||
else supervisedActor.supervisor.get ! Exit(supervisedActor, exception)
|
||||
}
|
||||
val exception = parseException(reply, client.loader)
|
||||
|
||||
future.completeWithException(exception)
|
||||
}
|
||||
futures remove replyUuid
|
||||
} else {
|
||||
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
|
||||
|
||||
case message =>
|
||||
val exception = new RemoteClientException("Unknown message received in remote client handler: " + message, client)
|
||||
client.notifyListeners(RemoteClientError(exception, client))
|
||||
throw exception
|
||||
}
|
||||
|
|
|
|||
|
|
@ -295,7 +295,10 @@ object RemoteActorSerialization {
|
|||
}
|
||||
val actorInfo = actorInfoBuilder.build
|
||||
val messageBuilder = RemoteMessageProtocol.newBuilder
|
||||
.setUuid(uuidProtocol)
|
||||
.setUuid({
|
||||
val messageUuid = newUuid
|
||||
UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build
|
||||
})
|
||||
.setActorInfo(actorInfo)
|
||||
.setOneWay(isOneWay)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue