diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index da60c7880a..e68cc6536e 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -20,7 +20,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { "register its ServerSocketChannel with its selector" in new TestSetup "let the Bind commander know when binding is completed" in new TestSetup { - listener ! WorkerForCommandDone + listener ! ChannelRegistered bindCommander.expectMsg(Bound) } @@ -87,7 +87,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { parent.expectMsgType[RegisterChannel] def bindListener() { - listener ! WorkerForCommandDone + listener ! ChannelRegistered bindCommander.expectMsg(Bound) } diff --git a/akka-actor/src/main/scala/akka/io/IO.scala b/akka-actor/src/main/scala/akka/io/IO.scala index 8f152fe303..5b10023990 100644 --- a/akka-actor/src/main/scala/akka/io/IO.scala +++ b/akka-actor/src/main/scala/akka/io/IO.scala @@ -26,15 +26,15 @@ object IO { props = Props(new SelectionHandler(self, selectorSettings)).withRouter(RandomRouter(nrOfSelectors)), name = "selectors") - private def createKickStart(pf: PartialFunction[HasFailureMessage, Props]): PartialFunction[HasFailureMessage, WorkerForCommand] = { + private def createWorkerMessage(pf: PartialFunction[HasFailureMessage, Props]): PartialFunction[HasFailureMessage, WorkerForCommand] = { case cmd ⇒ val props = pf(cmd) val commander = sender WorkerForCommand(cmd, commander, props) } - def kickStartReceive(pf: PartialFunction[Any, Props]): Receive = { - case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! createKickStart(pf)(cmd) + def workerForCommand(pf: PartialFunction[Any, Props]): Receive = { + case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! createWorkerMessage(pf)(cmd) } } diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index daadb0c8b5..59b0894543 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -44,10 +44,9 @@ abstract class SelectionHandlerSettings(config: Config) { private[io] object SelectionHandler { case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: Props) - // FIXME: all actors should listen to this - case object WorkerForCommandDone case class RegisterChannel(channel: SelectableChannel, initialOps: Int) + case object ChannelRegistered case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) } case object ChannelConnectable @@ -79,7 +78,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler case cmd: WorkerForCommand ⇒ // FIXME: factor out to common - withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) ! WorkerForCommandDone } + withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) } case RegisterChannel(channel, initialOps) ⇒ execute(registerChannel(channel, sender, initialOps)) @@ -88,7 +87,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler commander ! cmd.failureMessage case Retry(cmd, retriesLeft) ⇒ - withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) ! WorkerForCommandDone } + withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) } case Terminated(child) ⇒ execute(unregister(child)) @@ -144,6 +143,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler new Task { def tryRun() { updateKeyMap(channelActor, channel.register(selector, initialOps, channelActor)) + channelActor ! ChannelRegistered } } diff --git a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala index ea2703c40e..2f7cf9c5fa 100644 --- a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala @@ -8,7 +8,7 @@ import java.nio.channels.SocketChannel import scala.collection.immutable import akka.actor.ActorRef import akka.io.Inet.SocketOption -import akka.io.SelectionHandler.RegisterChannel +import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel } /** * An actor handling the connection state machine for an incoming, already connected @@ -22,8 +22,9 @@ private[io] class TcpIncomingConnection(_channel: SocketChannel, context.watch(handler) // sign death pact - completeConnect(handler, options) context.parent ! RegisterChannel(channel, 0) - def receive = PartialFunction.empty + def receive = { + case ChannelRegistered ⇒ completeConnect(handler, options) + } } diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index e883310ae3..356319286f 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -50,7 +50,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, log.debug("Successfully bound to {}", endpoint) def receive: Receive = { - case WorkerForCommandDone ⇒ + case ChannelRegistered ⇒ bindCommander ! Bound context.become(bound) } diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index 81d3f91fa2..8761104ba5 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -45,7 +45,7 @@ import akka.io.IO.SelectorBasedManager */ private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging { - def receive = kickStartReceive { + def receive = workerForCommand { case Connect(remoteAddress, localAddress, options) ⇒ val commander = sender Props(new TcpOutgoingConnection(tcp, commander, remoteAddress, localAddress, options)) diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 2c28bbada2..6c9231fa46 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -28,17 +28,18 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, localAddress.foreach(channel.socket.bind) options.foreach(_.beforeConnect(channel.socket)) + selector ! RegisterChannel(channel, SelectionKey.OP_CONNECT) - log.debug("Attempting connection to {}", remoteAddress) - if (channel.connect(remoteAddress)) - completeConnect(commander, options) - else { - selector ! RegisterChannel(channel, SelectionKey.OP_CONNECT) - context.become(connecting(commander, options)) + def receive: Receive = { + case ChannelRegistered ⇒ + log.debug("Attempting connection to {}", remoteAddress) + if (channel.connect(remoteAddress)) + completeConnect(commander, options) + else { + context.become(connecting(commander, options)) + } } - def receive: Receive = PartialFunction.empty - def connecting(commander: ActorRef, options: immutable.Traversable[SocketOption]): Receive = { case ChannelConnectable ⇒ try { diff --git a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala index c3600ff121..cfafb8bc52 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala @@ -13,6 +13,8 @@ import java.nio.channels.DatagramChannel import java.nio.channels.SelectionKey._ import scala.collection.immutable import scala.util.control.NonFatal +import akka.io.UdpFF.Received +import akka.io.SelectionHandler.RegisterChannel private[io] class UdpFFListener(selectorRouter: ActorRef, handler: ActorRef, @@ -39,20 +41,15 @@ private[io] class UdpFFListener(selectorRouter: ActorRef, bindCommander ! Bound log.debug("Successfully bound to {}", endpoint) - def receive: Receive = receiveInternal orElse sendHandlers + def receive: Receive = { + case ChannelRegistered ⇒ context.become(readHandlers orElse sendHandlers, discardOld = true) + } - def receiveInternal: Receive = { + def readHandlers: Receive = { case StopReading ⇒ selector ! StopReading case ResumeReading ⇒ selector ! ReadInterest case ChannelReadable ⇒ doReceive(handler, None) - // case CommandFailed(RegisterChannel(channel, _)) ⇒ - // log.warning("Could not bind to UDP port since selector capacity limit is reached, aborting bind") - // try channel.close() - // catch { - // case NonFatal(e) ⇒ log.error(e, "Error closing channel") - // } - case Unbind ⇒ log.debug("Unbinding endpoint {}", endpoint) channel.close() diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala index 4499cb8115..3f5c677991 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala @@ -49,7 +49,7 @@ private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udp props = Props(new UdpFFSender(udpFF, selectorPool)), name = "simplesend") - def receive = kickStartReceive { + def receive = workerForCommand { case Bind(handler, endpoint, options) ⇒ val commander = sender Props(new UdpFFListener(selectorPool, handler, endpoint, commander, udpFF, options)) diff --git a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala index 4e6245c160..24e222e71c 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala @@ -6,7 +6,7 @@ package akka.io import akka.actor._ import java.nio.channels.DatagramChannel import akka.io.UdpFF._ -import akka.io.SelectionHandler.RegisterChannel +import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel } /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. @@ -21,9 +21,12 @@ private[io] class UdpFFSender(val udpFF: UdpFFExt, val selector: ActorRef) } selector ! RegisterChannel(channel, 0) - def receive: Receive = internalReceive orElse sendHandlers + def receive: Receive = { + case ChannelRegistered ⇒ context.become(simpleSendHandlers orElse sendHandlers, discardOld = true) + case _ ⇒ sender ! SimpleSendReady // FIXME: queueing here? + } - def internalReceive: Receive = { + def simpleSendHandlers: Receive = { case SimpleSender ⇒ sender ! SimpleSendReady }