diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 1106c75b21..2f10dfb4f7 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -1,6 +1,6 @@ package akka.remote.transport.netty -import akka.ConfigurationException +import akka.{ OnlyCauseStackTrace, ConfigurationException } import akka.actor.{ Address, ExtendedActorSystem } import akka.event.Logging import akka.remote.netty.{ SSLSettings, NettySSLSupport, DefaultDisposableChannelGroup } @@ -8,8 +8,8 @@ import akka.remote.transport.Transport._ import akka.remote.transport.netty.NettyTransportSettings.{ Udp, Tcp, Mode } import akka.remote.transport.{ AssociationHandle, Transport } import com.typesafe.config.Config -import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress } -import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors } +import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException } +import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException } import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap } import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel._ @@ -18,8 +18,8 @@ import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServer import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS } import scala.concurrent.{ ExecutionContext, Promise, Future } -import scala.util.Random -import scala.util.control.NonFatal +import scala.util.{ Try, Random } +import util.control.{ NoStackTrace, NonFatal } import akka.dispatch.ThreadPoolConfig import akka.remote.transport.AssociationHandle.HandleEventListener import java.util.concurrent.atomic.AtomicInteger @@ -30,7 +30,20 @@ object NettyTransportSettings { case object Udp extends Mode { override def toString = "udp" } } -class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) { +object NettyFutureBridge { + def apply(nettyFuture: ChannelFuture): Future[Channel] = { + val p = Promise[Channel]() + nettyFuture.addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture): Unit = p complete Try( + if (future.isSuccess) future.getChannel + else if (future.isCancelled) throw new CancellationException + else throw future.getCause) + }) + p.future + } +} + +class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace { def this(msg: String) = this(msg, null) } @@ -39,9 +52,9 @@ class NettyTransportSettings(config: Config) { import config._ val TransportMode: Mode = getString("transport-protocol") match { - case "tcp" ⇒ Tcp - case "udp" ⇒ Udp - case s @ _ ⇒ throw new ConfigurationException("Unknown transport: " + s) + case "tcp" ⇒ Tcp + case "udp" ⇒ Udp + case unknown ⇒ throw new ConfigurationException(s"Unknown transport: $unknown") } val EnableSsl: Boolean = if (getBoolean("enable-ssl") && TransportMode == Udp) @@ -54,10 +67,9 @@ class NettyTransportSettings(config: Config) { } private[this] def optionSize(s: String): Option[Int] = getBytes(s).toInt match { - case 0 ⇒ None - case x if x < 0 ⇒ - throw new ConfigurationException(s"Setting '$s' must be 0 or positive (and fit in an Int)") - case other ⇒ Some(other) + case 0 ⇒ None + case x if x < 0 ⇒ throw new ConfigurationException(s"Setting '$s' must be 0 or positive (and fit in an Int)") + case other ⇒ Some(other) } val ConnectionTimeout: FiniteDuration = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) @@ -270,113 +282,69 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s def addressFromSocketAddress(addr: SocketAddress, systemName: Option[String] = None, - hostName: Option[String] = None): Option[Address] = { - addr match { - case sa: InetSocketAddress ⇒ - Some(Address(schemeIdentifier, systemName.getOrElse(""), hostName.getOrElse(sa.getHostName), sa.getPort)) - - case _ ⇒ None - } + hostName: Option[String] = None): Option[Address] = addr match { + case sa: InetSocketAddress ⇒ Some(Address(schemeIdentifier, systemName.getOrElse(""), hostName.getOrElse(sa.getHostName), sa.getPort)) + case _ ⇒ None } def addressToSocketAddress(addr: Address): InetSocketAddress = new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get) - override def listen: Future[(Address, Promise[AssociationEventListener])] = { - val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise() - - try { - serverChannel = inboundBootstrap match { - case b: ServerBootstrap ⇒ b.bind(new InetSocketAddress(InetAddress.getByName(settings.Hostname), settings.PortSelector)) - case b: ConnectionlessBootstrap ⇒ - b.bind(new InetSocketAddress(InetAddress.getByName(settings.Hostname), settings.PortSelector)) + override def listen: Future[(Address, Promise[AssociationEventListener])] = + (Promise[(Address, Promise[AssociationEventListener])]() complete Try { + val address = addressToSocketAddress(Address("", "", settings.Hostname, settings.PortSelector)) + val newServerChannel = inboundBootstrap match { + case b: ServerBootstrap ⇒ b.bind(address) + case b: ConnectionlessBootstrap ⇒ b.bind(address) } // Block reads until a handler actor is registered - serverChannel.setReadable(false) - channelGroup.add(serverChannel) + newServerChannel.setReadable(false) + channelGroup.add(newServerChannel) - addressFromSocketAddress(serverChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match { + serverChannel = newServerChannel + + addressFromSocketAddress(newServerChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match { case Some(address) ⇒ - val listenerPromise: Promise[AssociationEventListener] = Promise() - listenPromise.success((address, listenerPromise)) localAddress = address - listenerPromise.future.onSuccess { - case listener: AssociationEventListener ⇒ - associationListenerPromise.success(listener) - serverChannel.setReadable(true) - } - - case None ⇒ - listenPromise.failure( - new NettyTransportException(s"Unknown local address type ${serverChannel.getLocalAddress.getClass}")) + associationListenerPromise.future.onSuccess { case listener ⇒ newServerChannel.setReadable(true) } + (address, associationListenerPromise) + case None ⇒ throw new NettyTransportException(s"Unknown local address type ${newServerChannel.getLocalAddress.getClass}") } - - } catch { - case NonFatal(e) ⇒ listenPromise.failure(e) - } - - listenPromise.future - } + }).future override def associate(remoteAddress: Address): Future[AssociationHandle] = { - val statusPromise: Promise[AssociationHandle] = Promise() - - if (!serverChannel.isBound) statusPromise.failure(new NettyTransportException("Transport is not bound")) + if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound")) else { - - try { - if (!isDatagram) { - val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress)) - - connectFuture.addListener(new ChannelFutureListener { - override def operationComplete(future: ChannelFuture) { - if (!future.isSuccess) - statusPromise.failure(future.getCause) - else if (future.isCancelled) - statusPromise.failure(new NettyTransportException("Connection was cancelled")) - - } - }) - - } else { - val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress)) - - connectFuture.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (!future.isSuccess) - statusPromise.failure(future.getCause) - else if (future.isCancelled) - statusPromise.failure(new NettyTransportException("Connection was cancelled")) - else { - val handle: UdpAssociationHandle = - new UdpAssociationHandle(localAddress, remoteAddress, future.getChannel, NettyTransport.this) - - future.getChannel.getRemoteAddress match { - case addr: InetSocketAddress ⇒ - statusPromise.success(handle) - handle.readHandlerPromise.future.onSuccess { - case listener: HandleEventListener ⇒ udpConnectionTable.put(addr, listener) - } - case a ⇒ statusPromise.failure( - new NettyTransportException("Unknown remote address type " + a.getClass)) - } - } - } - }) + val statusPromise = Promise[AssociationHandle]() + (try { + val f = NettyFutureBridge(outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress))) recover { + case c: CancellationException ⇒ throw new NettyTransportException("Connection was cancelled") } + if (isDatagram) + f map { channel ⇒ + channel.getRemoteAddress match { + case addr: InetSocketAddress ⇒ + val handle = new UdpAssociationHandle(localAddress, remoteAddress, channel, NettyTransport.this) + statusPromise.success(handle) + handle.readHandlerPromise.future.onSuccess { case listener ⇒ udpConnectionTable.put(addr, listener) } + case unknown ⇒ throw new NettyTransportException(s"Unknown remote address type ${unknown.getClass}") + } + } + else f } catch { - case e @ (_: UnknownHostException | _: SecurityException | _: IllegalArgumentException) ⇒ - statusPromise.failure(InvalidAssociationException("Invalid association ", e)) - + Future.failed(InvalidAssociationException("Invalid association ", e)) case NonFatal(e) ⇒ - statusPromise.failure(e) + Future.failed(e) + }) onFailure { + case t: ConnectException ⇒ statusPromise failure new NettyTransportException(t.getMessage, t.getCause) + case t ⇒ statusPromise failure t } - } - statusPromise.future + statusPromise.future + } } override def shutdown(): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index 2b7af65c98..080b03c152 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -22,8 +22,8 @@ private[remote] trait UdpHandlers extends CommonHandlers { transport.udpConnectionTable.putIfAbsent(remoteSocketAddress, listener) match { case null ⇒ listener notify InboundPayload(ByteString(msg.array())) case oldReader ⇒ - throw new NettyTransportException(s"Listener $listener attempted to register for remote address $remoteSocketAddress" + - s" but $oldReader was already registered.", null) + throw new NettyTransportException( + s"Listener $listener attempted to register for remote address $remoteSocketAddress but $oldReader was already registered.") } }