From 570b02f569d7ca62b3feb2f5916e869201c0208c Mon Sep 17 00:00:00 2001 From: Mathias Date: Tue, 22 Jan 2013 14:10:36 +0100 Subject: [PATCH] Decrease visibility of internals to `private[io]`, clean up imports --- akka-io/src/main/scala/akka/io/Tcp.scala | 16 +--------------- .../main/scala/akka/io/TcpConnection.scala | 14 +++++++------- .../scala/akka/io/TcpIncomingConnection.scala | 11 +++++------ .../src/main/scala/akka/io/TcpListener.scala | 16 ++++++++-------- .../src/main/scala/akka/io/TcpManager.scala | 2 +- .../scala/akka/io/TcpOutgoingConnection.scala | 13 +++++++------ .../src/main/scala/akka/io/TcpSelector.scala | 19 ++++++++++++++++++- .../scala/akka/io/TcpConnectionSpec.scala | 1 + .../test/scala/akka/io/TcpListenerSpec.scala | 1 + 9 files changed, 49 insertions(+), 44 deletions(-) diff --git a/akka-io/src/main/scala/akka/io/Tcp.scala b/akka-io/src/main/scala/akka/io/Tcp.scala index 7df771caf1..e9b7a07409 100644 --- a/akka-io/src/main/scala/akka/io/Tcp.scala +++ b/akka-io/src/main/scala/akka/io/Tcp.scala @@ -128,7 +128,7 @@ object Tcp extends ExtensionKey[TcpExt] { } /// COMMANDS - sealed trait Command + trait Command case class Connect(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, @@ -180,20 +180,6 @@ object Tcp extends ExtensionKey[TcpExt] { case object ConfirmedClosed extends ConnectionClosed case object PeerClosed extends ConnectionClosed case class ErrorClose(cause: String) extends ConnectionClosed - - /// INTERNAL - case class RegisterOutgoingConnection(channel: SocketChannel) - case class RegisterServerSocketChannel(channel: ServerSocketChannel) - case class RegisterIncomingConnection(channel: SocketChannel, handler: ActorRef, - options: Traversable[SocketOption]) extends Command - case class Retry(command: Command, retriesLeft: Int) { require(retriesLeft >= 0) } - case object ChannelConnectable - case object ChannelAcceptable - case object ChannelReadable - case object ChannelWritable - case object AcceptInterest - case object ReadInterest - case object WriteInterest } class TcpExt(system: ExtendedActorSystem) extends IO.Extension { diff --git a/akka-io/src/main/scala/akka/io/TcpConnection.scala b/akka-io/src/main/scala/akka/io/TcpConnection.scala index 632b28bd59..f6f1d099ee 100644 --- a/akka-io/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpConnection.scala @@ -7,21 +7,21 @@ package akka.io import java.net.InetSocketAddress import java.io.IOException import java.nio.channels.SocketChannel +import java.nio.ByteBuffer +import scala.annotation.tailrec import scala.util.control.NonFatal -import scala.collection.immutable import scala.concurrent.duration._ import akka.actor._ import akka.util.ByteString import Tcp._ -import annotation.tailrec -import java.nio.ByteBuffer +import TcpSelector._ /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. */ -abstract class TcpConnection(val selector: ActorRef, - val channel: SocketChannel, - val tcp: TcpExt) extends Actor with ActorLogging with WithBufferPool { +private[io] abstract class TcpConnection(val selector: ActorRef, + val channel: SocketChannel, + val tcp: TcpExt) extends Actor with ActorLogging with WithBufferPool { import tcp.Settings._ var pendingWrite: PendingWrite = null @@ -277,4 +277,4 @@ abstract class TcpConnection(val selector: ActorRef, private[TcpConnection] case class CloseInformation( notificationsTo: Set[ActorRef], closedEvent: ConnectionClosed) -} +} \ No newline at end of file diff --git a/akka-io/src/main/scala/akka/io/TcpIncomingConnection.scala b/akka-io/src/main/scala/akka/io/TcpIncomingConnection.scala index 5a4a9a5d94..503c810f44 100644 --- a/akka-io/src/main/scala/akka/io/TcpIncomingConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpIncomingConnection.scala @@ -5,7 +5,6 @@ package akka.io import java.nio.channels.SocketChannel -import scala.collection.immutable import akka.actor.ActorRef import Tcp.SocketOption @@ -13,11 +12,11 @@ import Tcp.SocketOption * An actor handling the connection state machine for an incoming, already connected * SocketChannel. */ -class TcpIncomingConnection(_selector: ActorRef, - _channel: SocketChannel, - _tcp: TcpExt, - handler: ActorRef, - options: Traversable[SocketOption]) +private[io] class TcpIncomingConnection(_selector: ActorRef, + _channel: SocketChannel, + _tcp: TcpExt, + handler: ActorRef, + options: Traversable[SocketOption]) extends TcpConnection(_selector, _channel, _tcp) { context.watch(handler) // sign death pact diff --git a/akka-io/src/main/scala/akka/io/TcpListener.scala b/akka-io/src/main/scala/akka/io/TcpListener.scala index fac5b90c9c..79e37505ad 100644 --- a/akka-io/src/main/scala/akka/io/TcpListener.scala +++ b/akka-io/src/main/scala/akka/io/TcpListener.scala @@ -7,18 +7,18 @@ package akka.io import java.net.InetSocketAddress import java.nio.channels.ServerSocketChannel import scala.annotation.tailrec -import scala.collection.immutable import scala.util.control.NonFatal import akka.actor.{ ActorLogging, ActorRef, Actor } +import TcpSelector._ import Tcp._ -class TcpListener(selector: ActorRef, - handler: ActorRef, - endpoint: InetSocketAddress, - backlog: Int, - bindCommander: ActorRef, - settings: TcpExt#Settings, - options: Traversable[SocketOption]) extends Actor with ActorLogging { +private[io] class TcpListener(selector: ActorRef, + handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int, + bindCommander: ActorRef, + settings: TcpExt#Settings, + options: Traversable[SocketOption]) extends Actor with ActorLogging { context.watch(handler) // sign death pact val channel = { diff --git a/akka-io/src/main/scala/akka/io/TcpManager.scala b/akka-io/src/main/scala/akka/io/TcpManager.scala index 6f2046d75f..0ef1ca3e52 100644 --- a/akka-io/src/main/scala/akka/io/TcpManager.scala +++ b/akka-io/src/main/scala/akka/io/TcpManager.scala @@ -43,7 +43,7 @@ import Tcp._ * with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference. * */ -class TcpManager(tcp: TcpExt) extends Actor with ActorLogging { +private[io] class TcpManager(tcp: TcpExt) extends Actor with ActorLogging { val selectorPool = context.actorOf( props = Props(new TcpSelector(self, tcp)).withRouter(RandomRouter(tcp.Settings.NrOfSelectors)), diff --git a/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala index dfd5e93771..c3276723b8 100644 --- a/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -8,18 +8,19 @@ import java.net.InetSocketAddress import java.io.IOException import java.nio.channels.SocketChannel import akka.actor.ActorRef +import TcpSelector._ import Tcp._ /** * An actor handling the connection state machine for an outgoing connection * to be established. */ -class TcpOutgoingConnection(_selector: ActorRef, - _tcp: TcpExt, - commander: ActorRef, - remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress], - options: Traversable[SocketOption]) +private[io] class TcpOutgoingConnection(_selector: ActorRef, + _tcp: TcpExt, + commander: ActorRef, + remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress], + options: Traversable[SocketOption]) extends TcpConnection(_selector, TcpOutgoingConnection.newSocketChannel(), _tcp) { context.watch(commander) // sign death pact diff --git a/akka-io/src/main/scala/akka/io/TcpSelector.scala b/akka-io/src/main/scala/akka/io/TcpSelector.scala index 69f5ff89c9..ec8e70fe9a 100644 --- a/akka-io/src/main/scala/akka/io/TcpSelector.scala +++ b/akka-io/src/main/scala/akka/io/TcpSelector.scala @@ -14,7 +14,8 @@ import scala.concurrent.duration._ import akka.actor._ import Tcp._ -class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLogging { +private[io] class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLogging { + import TcpSelector._ import tcp.Settings._ @volatile var childrenKeys = HashMap.empty[String, SelectionKey] @@ -222,4 +223,20 @@ class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLoggin } } } +} + +private[io] object TcpSelector { + case class RegisterOutgoingConnection(channel: SocketChannel) + case class RegisterServerSocketChannel(channel: ServerSocketChannel) + case class RegisterIncomingConnection(channel: SocketChannel, handler: ActorRef, + options: Traversable[SocketOption]) extends Tcp.Command + case class Retry(command: Command, retriesLeft: Int) { require(retriesLeft >= 0) } + + case object ChannelConnectable + case object ChannelAcceptable + case object ChannelReadable + case object ChannelWritable + case object AcceptInterest + case object ReadInterest + case object WriteInterest } \ No newline at end of file diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 7cdcc4f202..6881a54fb2 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -18,6 +18,7 @@ import akka.actor.{ PoisonPill, ActorRef, Terminated } import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec } import akka.util.ByteString import TestUtils._ +import TcpSelector._ import Tcp._ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") { diff --git a/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala index 8db5cb0d74..b972431a5c 100644 --- a/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala @@ -8,6 +8,7 @@ import java.net.Socket import scala.concurrent.duration._ import akka.actor.{ Terminated, SupervisorStrategy, Actor, Props } import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec } +import TcpSelector._ import Tcp._ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {