Adding support for failed messages to be notified to listeners, this closes ticket #587

This commit is contained in:
Viktor Klang 2010-12-30 14:59:00 +01:00
parent 3da0669233
commit f1f8d64e3c
2 changed files with 32 additions and 11 deletions

View file

@ -63,16 +63,20 @@ trait RemoteModule extends Logging {
*/
sealed trait RemoteClientLifeCycleEvent //TODO: REVISIT: Document change from RemoteClient to RemoteClientModule + remoteAddress
case class RemoteClientError(
@BeanProperty val cause: Throwable,
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
@BeanProperty cause: Throwable,
@BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
@BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
@BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientStarted(
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
@BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientShutdown(
@BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
@BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientWriteFailed(
@BeanProperty request: AnyRef,
@BeanProperty cause: Throwable,
@BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
/**

View file

@ -263,15 +263,32 @@ class RemoteClient private[akka] (
log.slf4j.debug("sending message: {} has future {}", request, senderFuture)
if (isRunning) {
if (request.getOneWay) {
connection.getChannel.write(request)
connection.getChannel.write(request).addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//We don't care about that right now
} else if (!future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
}
}
})
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult)
log.slf4j.debug("Stashing away future for {}",futureUuid)
connection.getChannel.write(request)
connection.getChannel.write(request).addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//We don't care about that right now
} else if (!future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
} else {
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult)
}
}
})
Some(futureResult)
}
} else {