Merge pull request #991 from drewhk/wip-updated-nettyfuturebridge-drewhk

Cleaned up chaining of ChannelGroupFutures
This commit is contained in:
drewhk 2013-01-03 09:59:43 -08:00
commit 3ceb02f846
2 changed files with 24 additions and 18 deletions

View file

@ -11,9 +11,9 @@ import com.typesafe.config.Config
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS }
@ -41,6 +41,20 @@ object NettyFutureBridge {
})
p.future
}
def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
import scala.collection.JavaConverters._
val p = Promise[ChannelGroup]
nettyFuture.addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture): Unit = p complete Try(
if (future.isCompleteSuccess) future.getGroup
else throw future.iterator.asScala.collectFirst {
case f if f.isCancelled new CancellationException
case f if !f.isSuccess f.getCause
} getOrElse new IllegalStateException("Error reported in ChannelGroupFuture, but no error found in individual futures."))
})
p.future
}
}
class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace {
@ -348,21 +362,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
}
override def shutdown(): Unit = {
// Force flush by trying to write an empty buffer and wait for success
channelGroup.write(ChannelBuffers.buffer(0)).addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture) {
channelGroup.unbind()
channelGroup.disconnect().addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture) {
channelGroup.close().addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture) {
inboundBootstrap.releaseExternalResources()
}
})
}
})
}
})
def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ c.getGroup }
for {
// Force flush by trying to write an empty buffer and wait for success
_ always(channelGroup.write(ChannelBuffers.buffer(0)))
_ always({ channelGroup.unbind(); channelGroup.disconnect() })
_ always(channelGroup.close())
} inboundBootstrap.releaseExternalResources()
}

View file

@ -35,7 +35,7 @@ private[remote] trait UdpHandlers extends CommonHandlers {
} else {
val listener = transport.udpConnectionTable.get(inetSocketAddress)
val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
if (bytes.length > 0)listener notify InboundPayload(ByteString(bytes))
if (bytes.length > 0) listener notify InboundPayload(ByteString(bytes))
}
case _
}