Adding synchronous writes to NettyRemoteSupport

This commit is contained in:
Viktor Klang 2011-03-23 15:00:29 +01:00
parent 1747f2120c
commit dfbc694059

View file

@ -81,8 +81,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private[akka] def withClientFor[T]( private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = { address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = {
loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY
val key = Address(address) val key = Address(address)
lock.readLock.lock lock.readLock.lock
try { try {
@ -217,15 +215,13 @@ abstract class RemoteClient private[akka] (
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) { if (isRunning) {
if (request.getOneWay) { if (request.getOneWay) {
currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { val future = currentChannel.write(RemoteEncoder.encode(request))
def operationComplete(future: ChannelFuture) { future.awaitUninterruptibly()
if (future.isCancelled) { if (!future.isCancelled && !future.isSuccess) {
//We don't care about that right now
} else if (!future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
} }
}
})
None None
} else { } else {
val futureResult = if (senderFuture.isDefined) senderFuture.get val futureResult = if (senderFuture.isDefined) senderFuture.get
@ -238,7 +234,9 @@ abstract class RemoteClient private[akka] (
futures.remove(futureUuid) //Clean this up futures.remove(futureUuid) //Clean this up
//We don't care about that right now //We don't care about that right now
} else if (!future.isSuccess) { } else if (!future.isSuccess) {
futures.remove(futureUuid) //Clean this up val f = futures.remove(futureUuid) //Clean this up
if (f ne null)
f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
} }
} }