Decrease visibility of internals to private[io], clean up imports
This commit is contained in:
parent
79accb810e
commit
570b02f569
9 changed files with 49 additions and 44 deletions
|
|
@ -128,7 +128,7 @@ object Tcp extends ExtensionKey[TcpExt] {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// COMMANDS
|
/// COMMANDS
|
||||||
sealed trait Command
|
trait Command
|
||||||
|
|
||||||
case class Connect(remoteAddress: InetSocketAddress,
|
case class Connect(remoteAddress: InetSocketAddress,
|
||||||
localAddress: Option[InetSocketAddress] = None,
|
localAddress: Option[InetSocketAddress] = None,
|
||||||
|
|
@ -180,20 +180,6 @@ object Tcp extends ExtensionKey[TcpExt] {
|
||||||
case object ConfirmedClosed extends ConnectionClosed
|
case object ConfirmedClosed extends ConnectionClosed
|
||||||
case object PeerClosed extends ConnectionClosed
|
case object PeerClosed extends ConnectionClosed
|
||||||
case class ErrorClose(cause: String) extends ConnectionClosed
|
case class ErrorClose(cause: String) extends ConnectionClosed
|
||||||
|
|
||||||
/// INTERNAL
|
|
||||||
case class RegisterOutgoingConnection(channel: SocketChannel)
|
|
||||||
case class RegisterServerSocketChannel(channel: ServerSocketChannel)
|
|
||||||
case class RegisterIncomingConnection(channel: SocketChannel, handler: ActorRef,
|
|
||||||
options: Traversable[SocketOption]) extends Command
|
|
||||||
case class Retry(command: Command, retriesLeft: Int) { require(retriesLeft >= 0) }
|
|
||||||
case object ChannelConnectable
|
|
||||||
case object ChannelAcceptable
|
|
||||||
case object ChannelReadable
|
|
||||||
case object ChannelWritable
|
|
||||||
case object AcceptInterest
|
|
||||||
case object ReadInterest
|
|
||||||
case object WriteInterest
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
||||||
|
|
|
||||||
|
|
@ -7,21 +7,21 @@ package akka.io
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.nio.channels.SocketChannel
|
import java.nio.channels.SocketChannel
|
||||||
|
import java.nio.ByteBuffer
|
||||||
|
import scala.annotation.tailrec
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import scala.collection.immutable
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import Tcp._
|
import Tcp._
|
||||||
import annotation.tailrec
|
import TcpSelector._
|
||||||
import java.nio.ByteBuffer
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for TcpIncomingConnection and TcpOutgoingConnection.
|
* Base class for TcpIncomingConnection and TcpOutgoingConnection.
|
||||||
*/
|
*/
|
||||||
abstract class TcpConnection(val selector: ActorRef,
|
private[io] abstract class TcpConnection(val selector: ActorRef,
|
||||||
val channel: SocketChannel,
|
val channel: SocketChannel,
|
||||||
val tcp: TcpExt) extends Actor with ActorLogging with WithBufferPool {
|
val tcp: TcpExt) extends Actor with ActorLogging with WithBufferPool {
|
||||||
import tcp.Settings._
|
import tcp.Settings._
|
||||||
var pendingWrite: PendingWrite = null
|
var pendingWrite: PendingWrite = null
|
||||||
|
|
||||||
|
|
@ -277,4 +277,4 @@ abstract class TcpConnection(val selector: ActorRef,
|
||||||
private[TcpConnection] case class CloseInformation(
|
private[TcpConnection] case class CloseInformation(
|
||||||
notificationsTo: Set[ActorRef],
|
notificationsTo: Set[ActorRef],
|
||||||
closedEvent: ConnectionClosed)
|
closedEvent: ConnectionClosed)
|
||||||
}
|
}
|
||||||
|
|
@ -5,7 +5,6 @@
|
||||||
package akka.io
|
package akka.io
|
||||||
|
|
||||||
import java.nio.channels.SocketChannel
|
import java.nio.channels.SocketChannel
|
||||||
import scala.collection.immutable
|
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import Tcp.SocketOption
|
import Tcp.SocketOption
|
||||||
|
|
||||||
|
|
@ -13,11 +12,11 @@ import Tcp.SocketOption
|
||||||
* An actor handling the connection state machine for an incoming, already connected
|
* An actor handling the connection state machine for an incoming, already connected
|
||||||
* SocketChannel.
|
* SocketChannel.
|
||||||
*/
|
*/
|
||||||
class TcpIncomingConnection(_selector: ActorRef,
|
private[io] class TcpIncomingConnection(_selector: ActorRef,
|
||||||
_channel: SocketChannel,
|
_channel: SocketChannel,
|
||||||
_tcp: TcpExt,
|
_tcp: TcpExt,
|
||||||
handler: ActorRef,
|
handler: ActorRef,
|
||||||
options: Traversable[SocketOption])
|
options: Traversable[SocketOption])
|
||||||
extends TcpConnection(_selector, _channel, _tcp) {
|
extends TcpConnection(_selector, _channel, _tcp) {
|
||||||
|
|
||||||
context.watch(handler) // sign death pact
|
context.watch(handler) // sign death pact
|
||||||
|
|
|
||||||
|
|
@ -7,18 +7,18 @@ package akka.io
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.nio.channels.ServerSocketChannel
|
import java.nio.channels.ServerSocketChannel
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import akka.actor.{ ActorLogging, ActorRef, Actor }
|
import akka.actor.{ ActorLogging, ActorRef, Actor }
|
||||||
|
import TcpSelector._
|
||||||
import Tcp._
|
import Tcp._
|
||||||
|
|
||||||
class TcpListener(selector: ActorRef,
|
private[io] class TcpListener(selector: ActorRef,
|
||||||
handler: ActorRef,
|
handler: ActorRef,
|
||||||
endpoint: InetSocketAddress,
|
endpoint: InetSocketAddress,
|
||||||
backlog: Int,
|
backlog: Int,
|
||||||
bindCommander: ActorRef,
|
bindCommander: ActorRef,
|
||||||
settings: TcpExt#Settings,
|
settings: TcpExt#Settings,
|
||||||
options: Traversable[SocketOption]) extends Actor with ActorLogging {
|
options: Traversable[SocketOption]) extends Actor with ActorLogging {
|
||||||
|
|
||||||
context.watch(handler) // sign death pact
|
context.watch(handler) // sign death pact
|
||||||
val channel = {
|
val channel = {
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ import Tcp._
|
||||||
* with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference.
|
* with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class TcpManager(tcp: TcpExt) extends Actor with ActorLogging {
|
private[io] class TcpManager(tcp: TcpExt) extends Actor with ActorLogging {
|
||||||
|
|
||||||
val selectorPool = context.actorOf(
|
val selectorPool = context.actorOf(
|
||||||
props = Props(new TcpSelector(self, tcp)).withRouter(RandomRouter(tcp.Settings.NrOfSelectors)),
|
props = Props(new TcpSelector(self, tcp)).withRouter(RandomRouter(tcp.Settings.NrOfSelectors)),
|
||||||
|
|
|
||||||
|
|
@ -8,18 +8,19 @@ import java.net.InetSocketAddress
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.nio.channels.SocketChannel
|
import java.nio.channels.SocketChannel
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
|
import TcpSelector._
|
||||||
import Tcp._
|
import Tcp._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An actor handling the connection state machine for an outgoing connection
|
* An actor handling the connection state machine for an outgoing connection
|
||||||
* to be established.
|
* to be established.
|
||||||
*/
|
*/
|
||||||
class TcpOutgoingConnection(_selector: ActorRef,
|
private[io] class TcpOutgoingConnection(_selector: ActorRef,
|
||||||
_tcp: TcpExt,
|
_tcp: TcpExt,
|
||||||
commander: ActorRef,
|
commander: ActorRef,
|
||||||
remoteAddress: InetSocketAddress,
|
remoteAddress: InetSocketAddress,
|
||||||
localAddress: Option[InetSocketAddress],
|
localAddress: Option[InetSocketAddress],
|
||||||
options: Traversable[SocketOption])
|
options: Traversable[SocketOption])
|
||||||
extends TcpConnection(_selector, TcpOutgoingConnection.newSocketChannel(), _tcp) {
|
extends TcpConnection(_selector, TcpOutgoingConnection.newSocketChannel(), _tcp) {
|
||||||
|
|
||||||
context.watch(commander) // sign death pact
|
context.watch(commander) // sign death pact
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,8 @@ import scala.concurrent.duration._
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import Tcp._
|
import Tcp._
|
||||||
|
|
||||||
class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLogging {
|
private[io] class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLogging {
|
||||||
|
import TcpSelector._
|
||||||
import tcp.Settings._
|
import tcp.Settings._
|
||||||
|
|
||||||
@volatile var childrenKeys = HashMap.empty[String, SelectionKey]
|
@volatile var childrenKeys = HashMap.empty[String, SelectionKey]
|
||||||
|
|
@ -222,4 +223,20 @@ class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLoggin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private[io] object TcpSelector {
|
||||||
|
case class RegisterOutgoingConnection(channel: SocketChannel)
|
||||||
|
case class RegisterServerSocketChannel(channel: ServerSocketChannel)
|
||||||
|
case class RegisterIncomingConnection(channel: SocketChannel, handler: ActorRef,
|
||||||
|
options: Traversable[SocketOption]) extends Tcp.Command
|
||||||
|
case class Retry(command: Command, retriesLeft: Int) { require(retriesLeft >= 0) }
|
||||||
|
|
||||||
|
case object ChannelConnectable
|
||||||
|
case object ChannelAcceptable
|
||||||
|
case object ChannelReadable
|
||||||
|
case object ChannelWritable
|
||||||
|
case object AcceptInterest
|
||||||
|
case object ReadInterest
|
||||||
|
case object WriteInterest
|
||||||
}
|
}
|
||||||
|
|
@ -18,6 +18,7 @@ import akka.actor.{ PoisonPill, ActorRef, Terminated }
|
||||||
import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec }
|
import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import TestUtils._
|
import TestUtils._
|
||||||
|
import TcpSelector._
|
||||||
import Tcp._
|
import Tcp._
|
||||||
|
|
||||||
class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") {
|
class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") {
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import java.net.Socket
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor.{ Terminated, SupervisorStrategy, Actor, Props }
|
import akka.actor.{ Terminated, SupervisorStrategy, Actor, Props }
|
||||||
import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec }
|
import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec }
|
||||||
|
import TcpSelector._
|
||||||
import Tcp._
|
import Tcp._
|
||||||
|
|
||||||
class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
|
class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue