diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 7a50a3556c..891c930ef5 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -63,16 +63,20 @@ trait RemoteModule extends Logging { */ sealed trait RemoteClientLifeCycleEvent //TODO: REVISIT: Document change from RemoteClient to RemoteClientModule + remoteAddress case class RemoteClientError( - @BeanProperty val cause: Throwable, - @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty cause: Throwable, + @BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent case class RemoteClientDisconnected( - @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent case class RemoteClientConnected( - @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent case class RemoteClientStarted( - @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent case class RemoteClientShutdown( - @BeanProperty val client: RemoteClientModule, val remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + @BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientWriteFailed( + @BeanProperty request: AnyRef, + @BeanProperty cause: Throwable, + @BeanProperty client: RemoteClientModule, remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent /** diff --git a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala index b79bc696a1..ca60b39ab3 100644 --- a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala @@ -263,15 +263,32 @@ class RemoteClient private[akka] ( log.slf4j.debug("sending message: {} has future {}", request, senderFuture) if (isRunning) { if (request.getOneWay) { - connection.getChannel.write(request) + connection.getChannel.write(request).addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (future.isCancelled) { + //We don't care about that right now + } else if (!future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + } + } + }) None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) - val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) - futures.put(futureUuid, futureResult) - log.slf4j.debug("Stashing away future for {}",futureUuid) - connection.getChannel.write(request) + + connection.getChannel.write(request).addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (future.isCancelled) { + //We don't care about that right now + } else if (!future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + } else { + val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) + futures.put(futureUuid, futureResult) + } + } + }) Some(futureResult) } } else {