diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 09f936d07e..6cf39729af 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -216,9 +216,11 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) import context.dispatcher // FIXME is this the right EC for the future below? val mode = if (t.rateMBit < 0.0f) Unthrottled else if (t.rateMBit == 0.0f) Blackhole - else TokenBucket(500, t.rateMBit * 125000.0, 0, 0) + // Conversion needed as the TokenBucket measures in octets: 125000 Octets/s = 1Mbit/s + else TokenBucket(capacity = 500, tokensPerSecond = t.rateMBit * 125000.0, lastSend = 0, availableTokens = 0) val cmdFuture = TestConductor().transport.managementCommand(SetThrottle(t.target, t.direction, mode)) + cmdFuture onSuccess { case b: Boolean ⇒ self ! ToServer(Done) case _ ⇒ throw new RuntimeException("Throttle was requested from the TestConductor, but no transport " + diff --git a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala index 63c51ef2b8..7500016788 100644 --- a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala +++ b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala @@ -19,7 +19,7 @@ import java.util.concurrent.locks.{ ReentrantLock, Lock } class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetector) extends FailureDetectorRegistry[A] { private val resourceToFailureDetector = new AtomicReference[Map[A, FailureDetector]](Map()) - private final val failureDectorCreationLock: Lock = new ReentrantLock + private final val failureDetectorCreationLock: Lock = new ReentrantLock /** * Returns true if the resource is considered to be up and healthy and returns false otherwise. For unregistered @@ -30,13 +30,14 @@ class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetec case _ ⇒ true } - @tailrec final override def heartbeat(resource: A): Unit = { + final override def heartbeat(resource: A): Unit = { resourceToFailureDetector.get.get(resource) match { case Some(failureDetector) ⇒ failureDetector.heartbeat() case None ⇒ // First one wins and creates the new FailureDetector - if (failureDectorCreationLock.tryLock()) try { + failureDetectorCreationLock.lock() + try { // First check for non-existing key was outside the lock, and a second thread might just released the lock // when this one acquired it, so the second check is needed. val oldTable = resourceToFailureDetector.get @@ -48,8 +49,7 @@ class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetec newDetector.heartbeat() resourceToFailureDetector.set(oldTable + (resource -> newDetector)) } - } finally failureDectorCreationLock.unlock() - else heartbeat(resource) // The thread that lost the race will try to reread + } finally failureDetectorCreationLock.unlock() } } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index dd1a5074a3..dbe31e94dd 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -12,7 +12,6 @@ import akka.remote.transport.AssociationHandle._ import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle } import akka.serialization.Serialization import akka.util.ByteString -import java.net.URLEncoder import scala.util.control.NonFatal /** @@ -244,7 +243,7 @@ private[remote] class EndpointWriter( val readerDispatcher = msgDispatch reader = Some( context.watch(context.actorOf(Props(new EndpointReader(readerCodec, readerLocalAddress, readerDispatcher)), - "endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + readerId.next()))) + "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))) h.readHandlerPromise.success(reader.get) case None ⇒ throw new EndpointException("Internal error: No handle was present during creation of the endpoint" + "reader.", null) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 7313954232..c5d79a9df7 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -60,6 +60,10 @@ class RemotingSettings(val config: Config) { cfg.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } } +private[remote] object AddressUrlEncoder { + def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8") +} + private[remote] case class RARP(provider: RemoteActorRefProvider) extends Extension private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider { @@ -444,6 +448,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case (_, (transportAddress, _)) ⇒ transportAddress } map { case (a, t) if t.size > 1 ⇒ + // FIXME: Throwing on the wrong thread throw new RemoteTransportException(s"There are more than one transports listening on local address [$a]", null) case (a, t) ⇒ a -> t.head._1 } @@ -474,7 +479,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpointSettings, AkkaPduProtobufCodec)) .withDispatcher("akka.remoting.writer-dispatcher"), - "endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next())) + "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) } private def retryGateOpen(timeOfFailure: Long): Boolean = (timeOfFailure + settings.RetryGateClosedFor) < System.nanoTime() diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index a1be183665..d6a84cbdf3 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -9,14 +9,13 @@ import akka.remote.transport.AkkaProtocolTransport._ import akka.remote.transport.AssociationHandle._ import akka.remote.transport.ProtocolStateActor._ import akka.remote.transport.Transport._ -import akka.remote.{ PhiAccrualFailureDetector, FailureDetector, RemoteActorRefProvider } +import akka.remote.{ AddressUrlEncoder, PhiAccrualFailureDetector, FailureDetector, RemoteActorRefProvider } import akka.util.ByteString import com.typesafe.config.Config import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS } import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal import scala.util.{ Success, Failure } -import java.net.URLEncoder import scala.collection.immutable import akka.remote.transport.ActorTransportAdapter._ @@ -130,7 +129,7 @@ private[transport] class AkkaProtocolManager( } private def actorNameFor(remoteAddress: Address): String = - "akkaProtocol-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + nextId.next() + "akkaProtocol-" + AddressUrlEncoder(remoteAddress) + "-" + nextId.next() private def ready: Receive = { case InboundAssociation(handle) ⇒ @@ -453,10 +452,13 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat wrappedHandle.disassociate() } + private def listenForListenerRegistration(readHandlerPromise: Promise[HandleEventListener]): Unit = + readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self + private def notifyOutboundHandler(wrappedHandle: AssociationHandle, statusPromise: Promise[Status]): Future[HandleEventListener] = { val readHandlerPromise: Promise[HandleEventListener] = Promise() - readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self + listenForListenerRegistration(readHandlerPromise) val exposedHandle = new AkkaProtocolHandle( @@ -475,7 +477,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat originAddress: Address, associationListener: AssociationEventListener): Future[HandleEventListener] = { val readHandlerPromise: Promise[HandleEventListener] = Promise() - readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self + listenForListenerRegistration(readHandlerPromise) val exposedHandle = new AkkaProtocolHandle( diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 80cfd1e099..6a7ed190a6 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -22,6 +22,7 @@ import scala.util.Random import scala.util.control.NonFatal import akka.dispatch.ThreadPoolConfig import akka.remote.transport.AssociationHandle.HandleEventListener +import java.util.concurrent.atomic.AtomicInteger object NettyTransportSettings { sealed trait Mode @@ -91,13 +92,10 @@ class NettyTransportSettings(config: Config) { } -trait HasTransport { +trait CommonHandlers extends NettyHelpers { protected val transport: NettyTransport -} -trait CommonHandlers extends NettyHelpers with HasTransport { - - final override def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = transport.channels.add(e.getChannel) + final override def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = transport.channelGroup.add(e.getChannel) protected def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle @@ -125,7 +123,7 @@ trait CommonHandlers extends NettyHelpers with HasTransport { abstract class ServerHandler(protected final val transport: NettyTransport, private final val associationListenerFuture: Future[AssociationEventListener]) - extends NettyServerHelpers with CommonHandlers with HasTransport { + extends NettyServerHelpers with CommonHandlers { import transport.executionContext @@ -140,7 +138,7 @@ abstract class ServerHandler(protected final val transport: NettyTransport, abstract class ClientHandler(protected final val transport: NettyTransport, private final val statusPromise: Promise[Status]) - extends NettyClientHelpers with CommonHandlers with HasTransport { + extends NettyClientHelpers with CommonHandlers { final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { channel.setReadable(false) @@ -154,8 +152,10 @@ private[transport] object NettyTransport { val FrameLengthFieldLength = 4 def gracefulClose(channel: Channel): Unit = channel.disconnect().addListener(ChannelFutureListener.CLOSE) + val uniqueIdCounter = new AtomicInteger(0) } +// FIXME: Split into separate UDP and TCP classes class NettyTransport(private val settings: NettyTransportSettings, private val system: ExtendedActorSystem) extends Transport { def this(system: ExtendedActorSystem, conf: Config) = this(new NettyTransportSettings(conf), system) @@ -166,29 +166,36 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s implicit val executionContext: ExecutionContext = system.dispatcher override val schemeIdentifier: String = TransportMode + (if (EnableSsl) ".ssl" else "") - override val maximumPayloadBytes: Int = 32000 + override val maximumPayloadBytes: Int = 32000 // The number of octets required by the remoting specification private final val isDatagram: Boolean = TransportMode == Udp @volatile private var localAddress: Address = _ - @volatile private var masterChannel: Channel = _ + @volatile private var serverChannel: Channel = _ private val log = Logging(system, this.getClass) final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]() - val channels = new DefaultDisposableChannelGroup("netty-transport-" + Random.nextString(20)) - - private def executor: Executor = UseDispatcherForIo.map(system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool() + val channelGroup = new DefaultDisposableChannelGroup("akka-netty-transport-driver-channelgroup-" + + uniqueIdCounter.getAndIncrement) private val clientChannelFactory: ChannelFactory = TransportMode match { - case Tcp ⇒ new NioClientSocketChannelFactory(executor, executor, ClientSocketWorkerPoolSize) - case Udp ⇒ new NioDatagramChannelFactory(executor, ClientSocketWorkerPoolSize) + case Tcp ⇒ + val boss, worker = UseDispatcherForIo.map(system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool() + new NioClientSocketChannelFactory(boss, worker, ClientSocketWorkerPoolSize) + case Udp ⇒ + val pool = UseDispatcherForIo.map(system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool() + new NioDatagramChannelFactory(pool, ClientSocketWorkerPoolSize) } private val serverChannelFactory: ChannelFactory = TransportMode match { - case Tcp ⇒ new NioServerSocketChannelFactory(executor, executor, ServerSocketWorkerPoolSize) - case Udp ⇒ new NioDatagramChannelFactory(executor, ServerSocketWorkerPoolSize) + case Tcp ⇒ + val boss, worker = UseDispatcherForIo.map(system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool() + new NioServerSocketChannelFactory(boss, worker, ServerSocketWorkerPoolSize) + case Udp ⇒ + val pool = UseDispatcherForIo.map(system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool() + new NioDatagramChannelFactory(pool, ServerSocketWorkerPoolSize) } private def newPipeline: DefaultChannelPipeline = { @@ -232,6 +239,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s } private def setupBootstrap[B <: Bootstrap](bootstrap: B, pipelineFactory: ChannelPipelineFactory): B = { + // FIXME: Expose these settings in configuration bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("backlog", settings.Backlog) bootstrap.setOption("tcpNoDelay", true) @@ -276,17 +284,17 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise() try { - masterChannel = inboundBootstrap match { + serverChannel = inboundBootstrap match { case b: ServerBootstrap ⇒ b.bind(new InetSocketAddress(InetAddress.getByName(settings.Hostname), settings.PortSelector)) case b: ConnectionlessBootstrap ⇒ b.bind(new InetSocketAddress(InetAddress.getByName(settings.Hostname), settings.PortSelector)) } // Block reads until a handler actor is registered - masterChannel.setReadable(false) - channels.add(masterChannel) + serverChannel.setReadable(false) + channelGroup.add(serverChannel) - addressFromSocketAddress(masterChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match { + addressFromSocketAddress(serverChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match { case Some(address) ⇒ val listenerPromise: Promise[AssociationEventListener] = Promise() listenPromise.success((address, listenerPromise)) @@ -294,12 +302,12 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s listenerPromise.future.onSuccess { case listener: AssociationEventListener ⇒ associationListenerPromise.success(listener) - masterChannel.setReadable(true) + serverChannel.setReadable(true) } case None ⇒ listenPromise.failure( - new NettyTransportException(s"Unknown local address type ${masterChannel.getLocalAddress.getClass}", null)) + new NettyTransportException(s"Unknown local address type ${serverChannel.getLocalAddress.getClass}", null)) } } catch { @@ -312,7 +320,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s override def associate(remoteAddress: Address): Future[Status] = { val statusPromise: Promise[Status] = Promise() - if (!masterChannel.isBound) statusPromise.success(Fail(new NettyTransportException("Transport is not bound", null))) + if (!serverChannel.isBound) statusPromise.success(Fail(new NettyTransportException("Transport is not bound", null))) try { if (!isDatagram) { @@ -367,10 +375,10 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s } override def shutdown(): Unit = { - channels.unbind() - channels.disconnect().addListener(new ChannelGroupFutureListener { + channelGroup.unbind() + channelGroup.disconnect().addListener(new ChannelGroupFutureListener { def operationComplete(future: ChannelGroupFuture) { - channels.close() + channelGroup.close() inboundBootstrap.releaseExternalResources() } }) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 1f3dfbf18f..6911806e01 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -15,7 +15,7 @@ private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEvent def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel) foreach { _ notify msg } } -private[remote] trait TcpHandlers extends CommonHandlers with HasTransport { +private[remote] trait TcpHandlers extends CommonHandlers { import ChannelLocalActor._ diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index 73ac1c7292..c949990c5c 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -10,7 +10,7 @@ import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } import org.jboss.netty.channel._ import scala.concurrent.{ Future, Promise } -private[remote] trait UdpHandlers extends CommonHandlers with HasTransport { +private[remote] trait UdpHandlers extends CommonHandlers { override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = new UdpAssociationHandle(localAddress, remoteAddress, channel, transport)