diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 6044a56015..f6029d4cc4 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -68,7 +68,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") } "An outgoing connection" must { - info("Connecition reset by peer message expected is " + ConnectionResetByPeerMessage) + info("Connection reset by peer message expected is " + ConnectionResetByPeerMessage) info("Connection refused message prefix expected is " + ConnectionRefusedMessagePrefix) // common behavior @@ -389,11 +389,10 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") selector.send(connectionActor, ChannelReadable) connectionHandler.expectMsg(PeerClosed) - connectionHandler.send(connectionActor, Close) assertThisConnectionActorTerminated() } - "report when peer closed the connection but allow further writes and acknowledge normal close" in withEstablishedConnection() { setup ⇒ + "report when peer closed the connection but allow further writes and acknowledge normal close" in withEstablishedConnection(keepOpenOnPeerClosed = true) { setup ⇒ import setup._ closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side @@ -409,7 +408,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") assertThisConnectionActorTerminated() } - "report when peer closed the connection but allow further writes and acknowledge confirmed close" in withEstablishedConnection() { setup ⇒ + "report when peer closed the connection but allow further writes and acknowledge confirmed close" in withEstablishedConnection(keepOpenOnPeerClosed = true) { setup ⇒ import setup._ closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side @@ -695,7 +694,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") } def withEstablishedConnection( setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (), - clientSocketOptions: immutable.Seq[SocketOption] = Nil)(body: RegisteredSetup ⇒ Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup ⇒ + clientSocketOptions: immutable.Seq[SocketOption] = Nil, + keepOpenOnPeerClosed: Boolean = false)(body: RegisteredSetup ⇒ Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup ⇒ import unregisteredSetup._ val serverSideChannel = acceptServerSideConnection(localServer) @@ -706,7 +706,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) val connectionHandler = TestProbe() - userHandler.send(connectionActor, Register(connectionHandler.ref)) + userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed)) selector.expectMsg(ReadInterest) body { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index 9b7326d5a6..cac2ab5a88 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -23,8 +23,6 @@ class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegr clientHandler.send(clientConnection, Close) clientHandler.expectMsg(Closed) serverHandler.expectMsg(PeerClosed) - serverHandler.send(serverConnection, Close) - serverHandler.expectMsg(Closed) verifyActorTermination(clientConnection) verifyActorTermination(serverConnection) } @@ -54,8 +52,6 @@ class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegr serverHandler.send(serverConnection, Close) serverHandler.expectMsg(Closed) clientHandler.expectMsg(PeerClosed) - clientHandler.send(clientConnection, Close) - clientHandler.expectMsg(Closed) verifyActorTermination(clientConnection) verifyActorTermination(serverConnection) diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index bb2ea3adea..aee19ac24b 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -73,7 +73,7 @@ object Tcp extends ExtensionKey[TcpExt] { backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil) extends Command - case class Register(handler: ActorRef) extends Command + case class Register(handler: ActorRef, keepOpenOnPeerClosed: Boolean = false) extends Command case object Unbind extends Command sealed trait CloseCommand extends Command { diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 3a41c2b480..f6dc9aaf26 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -33,6 +33,8 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, // Needed to send the ConnectionClosed message in the postStop handler. var closedMessage: CloseInformation = null + var keepOpenOnPeerClosed: Boolean = false + def writePending = pendingWrite ne null def selector = context.parent @@ -41,8 +43,10 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, /** connection established, waiting for registration from user handler */ def waitingForRegistration(commander: ActorRef): Receive = { - case Register(handler) ⇒ + case Register(handler, keepOpenOnPeerClosed) ⇒ if (TraceLogging) log.debug("[{}] registered as connection handler", handler) + this.keepOpenOnPeerClosed = keepOpenOnPeerClosed + doRead(handler, None) // immediately try reading context.setReceiveTimeout(Duration.Undefined) @@ -200,7 +204,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, case Aborted ⇒ if (TraceLogging) log.debug("Got Abort command. RESETing connection.") doCloseConnection(handler, closeCommander, closedEvent) - case PeerClosed ⇒ + case PeerClosed if keepOpenOnPeerClosed ⇒ // report that peer closed the connection handler ! PeerClosed // used to check if peer already closed its side later diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index e05f61197e..dc711bd84d 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -485,7 +485,10 @@ successful, the listener will be notified with ``ConfirmedClosed``. ``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted``. -``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint. +``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint. Per default, the +connection will then automatically be closed from this endpoint as well. To support half-closed connections set the +``keepOpenOnPeerClosed`` member of the ``Register`` message to ``true`` in which case the connection stays open until +it receives one of the above close commands. ``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed.