diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 3fab1b20c1..5baddfed89 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -81,8 +81,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem private[akka] def withClientFor[T]( address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = { - loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY - val key = Address(address) lock.readLock.lock try { @@ -217,15 +215,13 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { if (isRunning) { if (request.getOneWay) { - currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - //We don't care about that right now - } else if (!future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } - }) + val future = currentChannel.write(RemoteEncoder.encode(request)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + throw future.getCause + } + None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get @@ -238,7 +234,9 @@ abstract class RemoteClient private[akka] ( futures.remove(futureUuid) //Clean this up //We don't care about that right now } else if (!future.isSuccess) { - futures.remove(futureUuid) //Clean this up + val f = futures.remove(futureUuid) //Clean this up + if (f ne null) + f.completeWithException(future.getCause) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) } }