diff --git a/akka-io/src/main/scala/akka/io/Tcp.scala b/akka-io/src/main/scala/akka/io/Tcp.scala index 91caabaeb7..cd9859c2d1 100644 --- a/akka-io/src/main/scala/akka/io/Tcp.scala +++ b/akka-io/src/main/scala/akka/io/Tcp.scala @@ -16,7 +16,6 @@ import java.net.Socket import java.net.ServerSocket import scala.concurrent.duration._ import scala.collection.immutable -import sun.security.tools.KeyTool.Command import akka.actor.ActorSystem object Tcp extends ExtensionKey[TcpExt] { @@ -33,6 +32,7 @@ object Tcp extends ExtensionKey[TcpExt] { address: InetSocketAddress, backlog: Int = 100, options: immutable.Seq[SO.SocketOption] = Nil) extends Command + case object Unbind extends Command case class Register(handler: ActorRef) extends Command object SO { @@ -161,8 +161,11 @@ object Tcp extends ExtensionKey[TcpExt] { /// EVENTS sealed trait Event + case object Bound extends Event + case class Received(data: ByteString) extends Event case class Connected(localAddress: InetSocketAddress, remoteAddress: InetSocketAddress) extends Event + case class CommandFailed(cmd: Command) extends Event sealed trait Closed extends Event case object PeerClosed extends Closed @@ -174,6 +177,12 @@ object Tcp extends ExtensionKey[TcpExt] { case class RegisterClientChannel(channel: SocketChannel) case class RegisterServerChannel(channel: ServerSocketChannel) case class CreateConnection(channel: SocketChannel) + case class Reject(command: Command, commander: ActorRef) + // Retry should be sent by Selector actors to their parent router with retriesLeft decremented. If retries are + // depleted, the selector actor must reply directly to the manager with a Reject (above). + case class Retry(command: Command, retriesLeft: Int, commander: ActorRef) { + require(retriesLeft >= 0, "The upper limit for retries must be nonnegative.") + } case object ChannelConnectable case object ChannelAcceptable case object ChannelReadable @@ -202,6 +211,6 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { } val manager = system.asInstanceOf[ActorSystemImpl].systemActorOf( - Props.empty.withDispatcher(Settings.ManagementDispatcher), "IO-TCP") + Props[TcpManager].withDispatcher(Settings.ManagementDispatcher), "IO-TCP") } \ No newline at end of file diff --git a/akka-io/src/main/scala/akka/io/TcpManager.scala b/akka-io/src/main/scala/akka/io/TcpManager.scala new file mode 100644 index 0000000000..344a861580 --- /dev/null +++ b/akka-io/src/main/scala/akka/io/TcpManager.scala @@ -0,0 +1,53 @@ +package akka.io + +import akka.actor.{ OneForOneStrategy, Actor, Props } +import akka.io.Tcp._ +import akka.routing.RandomRouter +import akka.actor.SupervisorStrategy.Restart + +/** + * TcpManager is a facade for accepting commands ([[akka.io.Tcp.Command]]) to open client or server TCP connections. + * + * TcpManager is obtainable by calling {{{ IO(TCP) }}} (see [[akka.io.IO]] and [[akka.io.Tcp]]) + * + * == Bind == + * + * To bind and listen to a local address, a [[akka.io.Tcp.Bind]] command must be sent to this actor. If the binding + * was successful, the sender of the [[akka.io.Tcp.Bind]] will be notified with a [[akka.io.Tcp.Bound]] + * message. The sender of the [[akka.io.Tcp.Bound]] message is the Listener actor (an internal actor responsible for + * listening to server events). To unbind the port an [[akka.io.Tcp.Unbind]] message must be sent to the Listener actor. + * + * If the bind request is rejected because the Tcp system is not able to register more channels (see the nr-of-selectors + * and max-channels configuration options in the akka.io.tcp section of the configuration) the sender will be notified + * with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference. + * + * When an inbound TCP connection is established, the handler will be notified by a [[akka.io.Tcp.Connected]] message. + * The sender of this message is the Connection actor (an internal actor representing the TCP connection). At this point + * the procedure is the same as for outbound connections (see section below). + * + * == Connect == + * + * To initiate a connection to a remote server, a [[akka.io.Tcp.Connect]] message must be sent to this actor. If the + * connection succeeds, the sender will be notified with a [[akka.io.Tcp.Connected]] message. The sender of the + * [[akka.io.Tcp.Connected]] message is the Connection actor (an internal actor representing the TCP connection). Before + * starting to use the connection, a handler should be registered to the Connection actor by sending a [[akka.io.Tcp.Register]] + * message. After a handler has been registered, all incoming data will be sent to the handler in the form of + * [[akka.io.Tcp.Received]] messages. To write data to the connection, a [[akka.io.Tcp.Write]] message should be sent + * to the Connection actor. + * + * If the connect request is rejected because the Tcp system is not able to register more channels (see the nr-of-selectors + * and max-channels configuration options in the akka.io.tcp section of the configuration) the sender will be notified + * with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference. + * + */ +class TcpManager extends Actor { + val settings = Tcp(context.system).Settings + + val selectorPool = context.actorOf(Props.empty.withRouter(RandomRouter(settings.NrOfSelectors))) + + def receive = { + case c: Connect ⇒ selectorPool forward c + case b: Bind ⇒ selectorPool forward b + case Reject(command, commander) ⇒ commander ! CommandFailed(command) + } +}