diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index f913fa0cbc..4bddb57795 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -11,9 +11,9 @@ import com.typesafe.config.Config import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException } import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException } import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap } -import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer} +import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer } import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener } +import org.jboss.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener } import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory } import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS } @@ -41,6 +41,20 @@ object NettyFutureBridge { }) p.future } + + def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = { + import scala.collection.JavaConverters._ + val p = Promise[ChannelGroup] + nettyFuture.addListener(new ChannelGroupFutureListener { + def operationComplete(future: ChannelGroupFuture): Unit = p complete Try( + if (future.isCompleteSuccess) future.getGroup + else throw future.iterator.asScala.collectFirst { + case f if f.isCancelled ⇒ new CancellationException + case f if !f.isSuccess ⇒ f.getCause + } getOrElse new IllegalStateException("Error reported in ChannelGroupFuture, but no error found in individual futures.")) + }) + p.future + } } class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace { @@ -348,21 +362,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s } override def shutdown(): Unit = { - // Force flush by trying to write an empty buffer and wait for success - channelGroup.write(ChannelBuffers.buffer(0)).addListener(new ChannelGroupFutureListener { - def operationComplete(future: ChannelGroupFuture) { - channelGroup.unbind() - channelGroup.disconnect().addListener(new ChannelGroupFutureListener { - def operationComplete(future: ChannelGroupFuture) { - channelGroup.close().addListener(new ChannelGroupFutureListener { - def operationComplete(future: ChannelGroupFuture) { - inboundBootstrap.releaseExternalResources() - } - }) - } - }) - } - }) + def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ ⇒ c.getGroup } + for { + // Force flush by trying to write an empty buffer and wait for success + _ ← always(channelGroup.write(ChannelBuffers.buffer(0))) + _ ← always({ channelGroup.unbind(); channelGroup.disconnect() }) + _ ← always(channelGroup.close()) + } inboundBootstrap.releaseExternalResources() } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index cb4251bf92..c52e8d9bc9 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -35,7 +35,7 @@ private[remote] trait UdpHandlers extends CommonHandlers { } else { val listener = transport.udpConnectionTable.get(inetSocketAddress) val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array() - if (bytes.length > 0)listener notify InboundPayload(ByteString(bytes)) + if (bytes.length > 0) listener notify InboundPayload(ByteString(bytes)) } case _ ⇒ }