Merge pull request #982 from akka/wip-minor-cleanup-√

Moderate restructuring of NettyTransport
This commit is contained in:
Viktor Klang (√) 2012-12-23 13:16:51 -08:00
commit 3cd9cca74a
2 changed files with 68 additions and 100 deletions

View file

@ -1,6 +1,6 @@
package akka.remote.transport.netty
import akka.ConfigurationException
import akka.{ OnlyCauseStackTrace, ConfigurationException }
import akka.actor.{ Address, ExtendedActorSystem }
import akka.event.Logging
import akka.remote.netty.{ SSLSettings, NettySSLSupport, DefaultDisposableChannelGroup }
@ -8,8 +8,8 @@ import akka.remote.transport.Transport._
import akka.remote.transport.netty.NettyTransportSettings.{ Udp, Tcp, Mode }
import akka.remote.transport.{ AssociationHandle, Transport }
import com.typesafe.config.Config
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors }
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel._
@ -18,8 +18,8 @@ import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServer
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS }
import scala.concurrent.{ ExecutionContext, Promise, Future }
import scala.util.Random
import scala.util.control.NonFatal
import scala.util.{ Try, Random }
import util.control.{ NoStackTrace, NonFatal }
import akka.dispatch.ThreadPoolConfig
import akka.remote.transport.AssociationHandle.HandleEventListener
import java.util.concurrent.atomic.AtomicInteger
@ -30,7 +30,20 @@ object NettyTransportSettings {
case object Udp extends Mode { override def toString = "udp" }
}
class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) {
object NettyFutureBridge {
def apply(nettyFuture: ChannelFuture): Future[Channel] = {
val p = Promise[Channel]()
nettyFuture.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = p complete Try(
if (future.isSuccess) future.getChannel
else if (future.isCancelled) throw new CancellationException
else throw future.getCause)
})
p.future
}
}
class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace {
def this(msg: String) = this(msg, null)
}
@ -39,9 +52,9 @@ class NettyTransportSettings(config: Config) {
import config._
val TransportMode: Mode = getString("transport-protocol") match {
case "tcp" Tcp
case "udp" Udp
case s @ _ throw new ConfigurationException("Unknown transport: " + s)
case "tcp" Tcp
case "udp" Udp
case unknown throw new ConfigurationException(s"Unknown transport: $unknown")
}
val EnableSsl: Boolean = if (getBoolean("enable-ssl") && TransportMode == Udp)
@ -54,10 +67,9 @@ class NettyTransportSettings(config: Config) {
}
private[this] def optionSize(s: String): Option[Int] = getBytes(s).toInt match {
case 0 None
case x if x < 0
throw new ConfigurationException(s"Setting '$s' must be 0 or positive (and fit in an Int)")
case other Some(other)
case 0 None
case x if x < 0 throw new ConfigurationException(s"Setting '$s' must be 0 or positive (and fit in an Int)")
case other Some(other)
}
val ConnectionTimeout: FiniteDuration = Duration(getMilliseconds("connection-timeout"), MILLISECONDS)
@ -270,113 +282,69 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
def addressFromSocketAddress(addr: SocketAddress,
systemName: Option[String] = None,
hostName: Option[String] = None): Option[Address] = {
addr match {
case sa: InetSocketAddress
Some(Address(schemeIdentifier, systemName.getOrElse(""), hostName.getOrElse(sa.getHostName), sa.getPort))
case _ None
}
hostName: Option[String] = None): Option[Address] = addr match {
case sa: InetSocketAddress Some(Address(schemeIdentifier, systemName.getOrElse(""), hostName.getOrElse(sa.getHostName), sa.getPort))
case _ None
}
def addressToSocketAddress(addr: Address): InetSocketAddress =
new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get)
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise()
try {
serverChannel = inboundBootstrap match {
case b: ServerBootstrap b.bind(new InetSocketAddress(InetAddress.getByName(settings.Hostname), settings.PortSelector))
case b: ConnectionlessBootstrap
b.bind(new InetSocketAddress(InetAddress.getByName(settings.Hostname), settings.PortSelector))
override def listen: Future[(Address, Promise[AssociationEventListener])] =
(Promise[(Address, Promise[AssociationEventListener])]() complete Try {
val address = addressToSocketAddress(Address("", "", settings.Hostname, settings.PortSelector))
val newServerChannel = inboundBootstrap match {
case b: ServerBootstrap b.bind(address)
case b: ConnectionlessBootstrap b.bind(address)
}
// Block reads until a handler actor is registered
serverChannel.setReadable(false)
channelGroup.add(serverChannel)
newServerChannel.setReadable(false)
channelGroup.add(newServerChannel)
addressFromSocketAddress(serverChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match {
serverChannel = newServerChannel
addressFromSocketAddress(newServerChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match {
case Some(address)
val listenerPromise: Promise[AssociationEventListener] = Promise()
listenPromise.success((address, listenerPromise))
localAddress = address
listenerPromise.future.onSuccess {
case listener: AssociationEventListener
associationListenerPromise.success(listener)
serverChannel.setReadable(true)
}
case None
listenPromise.failure(
new NettyTransportException(s"Unknown local address type ${serverChannel.getLocalAddress.getClass}"))
associationListenerPromise.future.onSuccess { case listener newServerChannel.setReadable(true) }
(address, associationListenerPromise)
case None throw new NettyTransportException(s"Unknown local address type ${newServerChannel.getLocalAddress.getClass}")
}
} catch {
case NonFatal(e) listenPromise.failure(e)
}
listenPromise.future
}
}).future
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
val statusPromise: Promise[AssociationHandle] = Promise()
if (!serverChannel.isBound) statusPromise.failure(new NettyTransportException("Transport is not bound"))
if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound"))
else {
try {
if (!isDatagram) {
val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress))
connectFuture.addListener(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture) {
if (!future.isSuccess)
statusPromise.failure(future.getCause)
else if (future.isCancelled)
statusPromise.failure(new NettyTransportException("Connection was cancelled"))
}
})
} else {
val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress))
connectFuture.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (!future.isSuccess)
statusPromise.failure(future.getCause)
else if (future.isCancelled)
statusPromise.failure(new NettyTransportException("Connection was cancelled"))
else {
val handle: UdpAssociationHandle =
new UdpAssociationHandle(localAddress, remoteAddress, future.getChannel, NettyTransport.this)
future.getChannel.getRemoteAddress match {
case addr: InetSocketAddress
statusPromise.success(handle)
handle.readHandlerPromise.future.onSuccess {
case listener: HandleEventListener udpConnectionTable.put(addr, listener)
}
case a statusPromise.failure(
new NettyTransportException("Unknown remote address type " + a.getClass))
}
}
}
})
val statusPromise = Promise[AssociationHandle]()
(try {
val f = NettyFutureBridge(outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress))) recover {
case c: CancellationException throw new NettyTransportException("Connection was cancelled")
}
if (isDatagram)
f map { channel
channel.getRemoteAddress match {
case addr: InetSocketAddress
val handle = new UdpAssociationHandle(localAddress, remoteAddress, channel, NettyTransport.this)
statusPromise.success(handle)
handle.readHandlerPromise.future.onSuccess { case listener udpConnectionTable.put(addr, listener) }
case unknown throw new NettyTransportException(s"Unknown remote address type ${unknown.getClass}")
}
}
else f
} catch {
case e @ (_: UnknownHostException | _: SecurityException | _: IllegalArgumentException)
statusPromise.failure(InvalidAssociationException("Invalid association ", e))
Future.failed(InvalidAssociationException("Invalid association ", e))
case NonFatal(e)
statusPromise.failure(e)
Future.failed(e)
}) onFailure {
case t: ConnectException statusPromise failure new NettyTransportException(t.getMessage, t.getCause)
case t statusPromise failure t
}
}
statusPromise.future
statusPromise.future
}
}
override def shutdown(): Unit = {

View file

@ -22,8 +22,8 @@ private[remote] trait UdpHandlers extends CommonHandlers {
transport.udpConnectionTable.putIfAbsent(remoteSocketAddress, listener) match {
case null listener notify InboundPayload(ByteString(msg.array()))
case oldReader
throw new NettyTransportException(s"Listener $listener attempted to register for remote address $remoteSocketAddress" +
s" but $oldReader was already registered.", null)
throw new NettyTransportException(
s"Listener $listener attempted to register for remote address $remoteSocketAddress but $oldReader was already registered.")
}
}