diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index aa66dc160a..da60c7880a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -20,7 +20,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { "register its ServerSocketChannel with its selector" in new TestSetup "let the Bind commander know when binding is completed" in new TestSetup { - listener ! KickStartDone + listener ! WorkerForCommandDone bindCommander.expectMsg(Bound) } @@ -36,13 +36,13 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { parent.expectMsg(AcceptInterest) // FIXME: ugly stuff here - selectorRouter.expectMsgType[KickStartCommand] - selectorRouter.expectMsgType[KickStartCommand] + selectorRouter.expectMsgType[WorkerForCommand] + selectorRouter.expectMsgType[WorkerForCommand] selectorRouter.expectNoMsg(100.millis) // and pick up the last remaining connection on the next ChannelAcceptable listener ! ChannelAcceptable - selectorRouter.expectMsgType[KickStartCommand] + selectorRouter.expectMsgType[WorkerForCommand] } "react to Unbind commands by replying with Unbound and stopping itself" in new TestSetup { @@ -61,7 +61,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { attemptConnectionToEndpoint() listener ! ChannelAcceptable - val props = selectorRouter.expectMsgType[KickStartCommand].childProps + val props = selectorRouter.expectMsgType[WorkerForCommand].childProps // FIXME: need to instantiate propss //selectorRouter.expectMsgType[RegisterChannel].channel.isOpen must be(true) @@ -87,7 +87,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { parent.expectMsgType[RegisterChannel] def bindListener() { - listener ! KickStartDone + listener ! WorkerForCommandDone bindCommander.expectMsg(Bound) } diff --git a/akka-actor/src/main/scala/akka/io/IO.scala b/akka-actor/src/main/scala/akka/io/IO.scala index 2b6cbd9f57..8f152fe303 100644 --- a/akka-actor/src/main/scala/akka/io/IO.scala +++ b/akka-actor/src/main/scala/akka/io/IO.scala @@ -6,7 +6,7 @@ package akka.io import akka.actor._ import akka.routing.RandomRouter -import akka.io.SelectionHandler.KickStartCommand +import akka.io.SelectionHandler.WorkerForCommand object IO { @@ -16,22 +16,25 @@ object IO { def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager + trait HasFailureMessage { + def failureMessage: Any + } + abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor { val selectorPool = context.actorOf( props = Props(new SelectionHandler(self, selectorSettings)).withRouter(RandomRouter(nrOfSelectors)), name = "selectors") - def createKickStart(pf: PartialFunction[Any, Props], cmd: Any): PartialFunction[Any, KickStartCommand] = { - pf.andThen { props ⇒ + private def createKickStart(pf: PartialFunction[HasFailureMessage, Props]): PartialFunction[HasFailureMessage, WorkerForCommand] = { + case cmd ⇒ + val props = pf(cmd) val commander = sender - KickStartCommand(cmd, commander, props) - } + WorkerForCommand(cmd, commander, props) } def kickStartReceive(pf: PartialFunction[Any, Props]): Receive = { - //case KickStartFailed = - case cmd if pf.isDefinedAt(cmd) ⇒ selectorPool ! createKickStart(pf, cmd)(cmd) + case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! createKickStart(pf)(cmd) } } diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 64d6b389f2..daadb0c8b5 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -14,6 +14,7 @@ import scala.concurrent.duration._ import akka.actor._ import com.typesafe.config.Config import akka.actor.Terminated +import akka.io.IO.HasFailureMessage abstract class SelectionHandlerSettings(config: Config) { import config._ @@ -42,14 +43,12 @@ abstract class SelectionHandlerSettings(config: Config) { private[io] object SelectionHandler { - //FIXME: temporary - case class KickStartCommand(apiCommand: Any, commander: ActorRef, childProps: Props) + case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: Props) // FIXME: all actors should listen to this - case object KickStartDone - case class KickStartFailed(apiCommand: Any, commander: ActorRef) + case object WorkerForCommandDone case class RegisterChannel(channel: SelectableChannel, initialOps: Int) - case class Retry(command: KickStartCommand, retriesLeft: Int) { require(retriesLeft >= 0) } + case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) } case object ChannelConnectable case object ChannelAcceptable @@ -75,21 +74,21 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler case ReadInterest ⇒ execute(enableInterest(OP_READ, sender)) case AcceptInterest ⇒ execute(enableInterest(OP_ACCEPT, sender)) + // FIXME: provide StopReading functionality //case StopReading ⇒ execute(disableInterest(OP_READ, sender)) - case cmd: KickStartCommand ⇒ + case cmd: WorkerForCommand ⇒ // FIXME: factor out to common - withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) ! KickStartDone } + withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) ! WorkerForCommandDone } case RegisterChannel(channel, initialOps) ⇒ execute(registerChannel(channel, sender, initialOps)) - case Retry(cmd, 0) ⇒ - // FIXME: extractors - manager ! KickStartFailed(cmd.apiCommand, cmd.commander) + case Retry(WorkerForCommand(cmd, commander, _), 0) ⇒ + commander ! cmd.failureMessage case Retry(cmd, retriesLeft) ⇒ - withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) ! KickStartDone } + withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) ! WorkerForCommandDone } case Terminated(child) ⇒ execute(unregister(child)) @@ -114,7 +113,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler // we can never recover from failures of a connection or listener child override def supervisorStrategy = SupervisorStrategy.stoppingStrategy - def withCapacityProtection(cmd: KickStartCommand, retriesLeft: Int)(body: ⇒ Unit): Unit = { + def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: ⇒ Unit): Unit = { log.debug("Executing {}", cmd) if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) { body diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index f7a31a76fe..625cbd1e7a 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -6,7 +6,7 @@ package akka.io import java.net.InetSocketAddress import java.net.Socket -import akka.io.Inet.SocketOption +import akka.io.Inet._ import com.typesafe.config.Config import scala.concurrent.duration._ import scala.collection.immutable @@ -58,7 +58,9 @@ object Tcp extends ExtensionKey[TcpExt] { } /// COMMANDS - trait Command + trait Command extends IO.HasFailureMessage { + def failureMessage = CommandFailed(this) + } case class Connect(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 541d101327..e883310ae3 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -5,7 +5,7 @@ package akka.io import java.net.InetSocketAddress -import java.nio.channels.{ SelectionKey, ServerSocketChannel } +import java.nio.channels.{ SocketChannel, SelectionKey, ServerSocketChannel } import scala.annotation.tailrec import scala.collection.immutable import scala.util.control.NonFatal @@ -13,6 +13,17 @@ import akka.actor.{ Props, ActorLogging, ActorRef, Actor } import akka.io.SelectionHandler._ import akka.io.Inet.SocketOption import akka.io.Tcp._ +import akka.io.IO.HasFailureMessage + +private[io] object TcpListener { + + case class RegisterIncoming(channel: SocketChannel) extends HasFailureMessage { + def failureMessage = FailedRegisterIncoming(channel) + } + + case class FailedRegisterIncoming(channel: SocketChannel) + +} private[io] class TcpListener(selectorRouter: ActorRef, handler: ActorRef, @@ -23,6 +34,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, options: immutable.Traversable[SocketOption]) extends Actor with ActorLogging { def selector: ActorRef = context.parent + import TcpListener._ import tcp.Settings._ context.watch(handler) // sign death pact @@ -38,7 +50,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, log.debug("Successfully bound to {}", endpoint) def receive: Receive = { - case KickStartDone ⇒ + case WorkerForCommandDone ⇒ bindCommander ! Bound context.become(bound) } @@ -47,12 +59,12 @@ private[io] class TcpListener(selectorRouter: ActorRef, case ChannelAcceptable ⇒ acceptAllPending(BatchAcceptLimit) - // case CommandFailed(RegisterIncomingConnection(socketChannel, _, _)) ⇒ - // log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection") - // try socketChannel.close() - // catch { - // case NonFatal(e) ⇒ log.error(e, "Error closing channel") - // } + case FailedRegisterIncoming(socketChannel) ⇒ + log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection") + try socketChannel.close() + catch { + case NonFatal(e) ⇒ log.error(e, "Error closing channel") + } case Unbind ⇒ log.debug("Unbinding endpoint {}", endpoint) @@ -72,9 +84,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, if (socketChannel != null) { log.debug("New connection accepted") socketChannel.configureBlocking(false) - //selectorRouter ! RegisterIncomingConnection(socketChannel, handler, options) - // FIXME null is not nice. There is no explicit API command here - selectorRouter ! KickStartCommand(null, context.system.deadLetters, Props(new TcpIncomingConnection(socketChannel, tcp, handler, options))) + selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, Props(new TcpIncomingConnection(socketChannel, tcp, handler, options))) acceptAllPending(limit - 1) } } else context.parent ! AcceptInterest diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index a6d3cc95ab..81d3f91fa2 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -4,10 +4,8 @@ package akka.io -import akka.actor.{ ActorLogging, Actor, Props } -import akka.routing.RandomRouter import Tcp._ -import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand } +import akka.actor.{ ActorLogging, Props } import akka.io.IO.SelectorBasedManager /** diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala index 31913dce3d..0d6c49b83e 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFF.scala @@ -15,7 +15,9 @@ object UdpFF extends ExtensionKey[UdpFFExt] { // Java API override def get(system: ActorSystem): UdpFFExt = system.extension(this) - trait Command + trait Command extends IO.HasFailureMessage { + def failureMessage = CommandFailed(this) + } case object NoAck case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command { diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala index be417166d0..4499cb8115 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala @@ -3,11 +3,9 @@ */ package akka.io -import akka.actor.{ ActorRef, Props, Actor } -import akka.io.UdpFF._ -import akka.routing.RandomRouter -import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand } +import akka.actor.{ ActorRef, Props } import akka.io.IO.SelectorBasedManager +import akka.io.UdpFF._ /** * UdpFFManager is a facade for simple fire-and-forget style UDP operations