Tcp: hide support for half-closed connections behind Register.keepOpenOnPeerClosed

This commit is contained in:
Johannes Rudolph 2013-04-07 17:05:22 +02:00
parent fd05cad103
commit e532a77824
5 changed files with 17 additions and 14 deletions

View file

@ -68,7 +68,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
} }
"An outgoing connection" must { "An outgoing connection" must {
info("Connecition reset by peer message expected is " + ConnectionResetByPeerMessage) info("Connection reset by peer message expected is " + ConnectionResetByPeerMessage)
info("Connection refused message prefix expected is " + ConnectionRefusedMessagePrefix) info("Connection refused message prefix expected is " + ConnectionRefusedMessagePrefix)
// common behavior // common behavior
@ -389,11 +389,10 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
selector.send(connectionActor, ChannelReadable) selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsg(PeerClosed) connectionHandler.expectMsg(PeerClosed)
connectionHandler.send(connectionActor, Close)
assertThisConnectionActorTerminated() assertThisConnectionActorTerminated()
} }
"report when peer closed the connection but allow further writes and acknowledge normal close" in withEstablishedConnection() { setup "report when peer closed the connection but allow further writes and acknowledge normal close" in withEstablishedConnection(keepOpenOnPeerClosed = true) { setup
import setup._ import setup._
closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side
@ -409,7 +408,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
assertThisConnectionActorTerminated() assertThisConnectionActorTerminated()
} }
"report when peer closed the connection but allow further writes and acknowledge confirmed close" in withEstablishedConnection() { setup "report when peer closed the connection but allow further writes and acknowledge confirmed close" in withEstablishedConnection(keepOpenOnPeerClosed = true) { setup
import setup._ import setup._
closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side
@ -695,7 +694,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
} }
def withEstablishedConnection( def withEstablishedConnection(
setServerSocketOptions: ServerSocketChannel Unit = _ (), setServerSocketOptions: ServerSocketChannel Unit = _ (),
clientSocketOptions: immutable.Seq[SocketOption] = Nil)(body: RegisteredSetup Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup clientSocketOptions: immutable.Seq[SocketOption] = Nil,
keepOpenOnPeerClosed: Boolean = false)(body: RegisteredSetup Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup
import unregisteredSetup._ import unregisteredSetup._
val serverSideChannel = acceptServerSideConnection(localServer) val serverSideChannel = acceptServerSideConnection(localServer)
@ -706,7 +706,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
val connectionHandler = TestProbe() val connectionHandler = TestProbe()
userHandler.send(connectionActor, Register(connectionHandler.ref)) userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed))
selector.expectMsg(ReadInterest) selector.expectMsg(ReadInterest)
body { body {

View file

@ -23,8 +23,6 @@ class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegr
clientHandler.send(clientConnection, Close) clientHandler.send(clientConnection, Close)
clientHandler.expectMsg(Closed) clientHandler.expectMsg(Closed)
serverHandler.expectMsg(PeerClosed) serverHandler.expectMsg(PeerClosed)
serverHandler.send(serverConnection, Close)
serverHandler.expectMsg(Closed)
verifyActorTermination(clientConnection) verifyActorTermination(clientConnection)
verifyActorTermination(serverConnection) verifyActorTermination(serverConnection)
} }
@ -54,8 +52,6 @@ class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegr
serverHandler.send(serverConnection, Close) serverHandler.send(serverConnection, Close)
serverHandler.expectMsg(Closed) serverHandler.expectMsg(Closed)
clientHandler.expectMsg(PeerClosed) clientHandler.expectMsg(PeerClosed)
clientHandler.send(clientConnection, Close)
clientHandler.expectMsg(Closed)
verifyActorTermination(clientConnection) verifyActorTermination(clientConnection)
verifyActorTermination(serverConnection) verifyActorTermination(serverConnection)

View file

@ -73,7 +73,7 @@ object Tcp extends ExtensionKey[TcpExt] {
backlog: Int = 100, backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil) extends Command options: immutable.Traversable[SocketOption] = Nil) extends Command
case class Register(handler: ActorRef) extends Command case class Register(handler: ActorRef, keepOpenOnPeerClosed: Boolean = false) extends Command
case object Unbind extends Command case object Unbind extends Command
sealed trait CloseCommand extends Command { sealed trait CloseCommand extends Command {

View file

@ -33,6 +33,8 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
// Needed to send the ConnectionClosed message in the postStop handler. // Needed to send the ConnectionClosed message in the postStop handler.
var closedMessage: CloseInformation = null var closedMessage: CloseInformation = null
var keepOpenOnPeerClosed: Boolean = false
def writePending = pendingWrite ne null def writePending = pendingWrite ne null
def selector = context.parent def selector = context.parent
@ -41,8 +43,10 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
/** connection established, waiting for registration from user handler */ /** connection established, waiting for registration from user handler */
def waitingForRegistration(commander: ActorRef): Receive = { def waitingForRegistration(commander: ActorRef): Receive = {
case Register(handler) case Register(handler, keepOpenOnPeerClosed)
if (TraceLogging) log.debug("[{}] registered as connection handler", handler) if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
this.keepOpenOnPeerClosed = keepOpenOnPeerClosed
doRead(handler, None) // immediately try reading doRead(handler, None) // immediately try reading
context.setReceiveTimeout(Duration.Undefined) context.setReceiveTimeout(Duration.Undefined)
@ -200,7 +204,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
case Aborted case Aborted
if (TraceLogging) log.debug("Got Abort command. RESETing connection.") if (TraceLogging) log.debug("Got Abort command. RESETing connection.")
doCloseConnection(handler, closeCommander, closedEvent) doCloseConnection(handler, closeCommander, closedEvent)
case PeerClosed case PeerClosed if keepOpenOnPeerClosed
// report that peer closed the connection // report that peer closed the connection
handler ! PeerClosed handler ! PeerClosed
// used to check if peer already closed its side later // used to check if peer already closed its side later

View file

@ -485,7 +485,10 @@ successful, the listener will be notified with ``ConfirmedClosed``.
``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending ``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending
writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted``. writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted``.
``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint. ``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint. Per default, the
connection will then automatically be closed from this endpoint as well. To support half-closed connections set the
``keepOpenOnPeerClosed`` member of the ``Register`` message to ``true`` in which case the connection stays open until
it receives one of the above close commands.
``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed. ``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed.