Adding support for failed messages to be notified to listeners, this closes ticket #587
This commit is contained in:
parent
4994b13fd5
commit
718f831650
2 changed files with 32 additions and 11 deletions
|
|
@ -263,15 +263,32 @@ class RemoteClient private[akka] (
|
|||
log.slf4j.debug("sending message: {} has future {}", request, senderFuture)
|
||||
if (isRunning) {
|
||||
if (request.getOneWay) {
|
||||
connection.getChannel.write(request)
|
||||
connection.getChannel.write(request).addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture) {
|
||||
if (future.isCancelled) {
|
||||
//We don't care about that right now
|
||||
} else if (!future.isSuccess) {
|
||||
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
|
||||
}
|
||||
}
|
||||
})
|
||||
None
|
||||
} else {
|
||||
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
|
||||
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
|
||||
futures.put(futureUuid, futureResult)
|
||||
log.slf4j.debug("Stashing away future for {}",futureUuid)
|
||||
connection.getChannel.write(request)
|
||||
|
||||
connection.getChannel.write(request).addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture) {
|
||||
if (future.isCancelled) {
|
||||
//We don't care about that right now
|
||||
} else if (!future.isSuccess) {
|
||||
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
|
||||
} else {
|
||||
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
|
||||
futures.put(futureUuid, futureResult)
|
||||
}
|
||||
}
|
||||
})
|
||||
Some(futureResult)
|
||||
}
|
||||
} else {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue