Merge pull request #17917 from drewhk/wip-17854-half-close-abort-drewhk

=str #17854: fullClose should not Abort
This commit is contained in:
drewhk 2015-07-07 14:30:59 +02:00
commit fd67d82619
2 changed files with 36 additions and 37 deletions

View file

@ -346,17 +346,19 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] =
Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right)
val binding = Tcp()
.bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false)
.toMat(Sink.foreach(_.flow.join(writeButIgnoreRead).run()))(Keep.left).run()
val binding =
Await.result(
Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn
conn.flow.join(writeButIgnoreRead).run()
})(Keep.left).run(), 3.seconds)
val result = Source.repeat(ByteString("client data"))
val result = Source.lazyEmpty[ByteString]
.via(Tcp().outgoingConnection(serverAddress.getHostName, serverAddress.getPort))
.runFold(ByteString.empty)(_ ++ _)
val r: ByteString = Await.result(result, 3.seconds)
r should ===(ByteString("Early response"))
binding.foreach(_.unbind())
Await.result(result, 3.seconds) should ===(ByteString("Early response"))
binding.unbind()
}
"Echo should work even if server is in full close mode" in {
@ -364,9 +366,11 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
val serverAddress = temporaryServerAddress()
val binding = Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn
conn.flow.join(Flow[ByteString]).run()
})(Keep.left).run()
val binding =
Await.result(
Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn
conn.flow.join(Flow[ByteString]).run()
})(Keep.left).run(), 3.seconds)
val result = Source(immutable.Iterable.fill(10000)(ByteString(0)))
.via(Tcp().outgoingConnection(serverAddress, halfClose = true))
@ -374,7 +378,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
Await.result(result, 3.seconds) should ===(10000)
binding.foreach(_.unbind())
binding.unbind()
}
"handle when connection actor terminates unexpectedly" in {

View file

@ -75,10 +75,10 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti
readPump.pump()
}
case ConfirmedClosed
cancelWithoutTcpClose()
cancel()
readPump.pump()
case PeerClosed
cancelWithoutTcpClose()
cancel()
readPump.pump()
}
@ -90,19 +90,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti
if (!closed) {
closed = true
pendingElement = null
if (connection ne null) {
if (tcpOutputs.isClosed)
connection ! Abort
else
connection ! ResumeReading
}
}
}
def cancelWithoutTcpClose(): Unit = {
if (!closed) {
closed = true
pendingElement = null
if (!tcpOutputs.isFlushed && (connection ne null)) connection ! ResumeReading
}
}
@ -154,17 +142,24 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti
override def complete(): Unit = {
if (!closed && initialized) {
closed = true
if (tcpInputs.isClosed && (halfClose || lastWriteAcked)) {
// We can immediately close if
// - half close mode, and read size already finished
// - full close mode, and last write has been acked
//
// if in full close mode, and has a non-acknowledged write, we will do the closing in handleWrite
// when the Ack arrives
connection ! Close
tryShutdown()
} else
connection ! ConfirmedClose
if (halfClose) {
if (tcpInputs.isClosed) {
// Reading has stopped, either because of cancel, or PeerClosed, just Close now
connection ! Close
tryShutdown()
} else {
// We still read, so we only close the write side
connection ! ConfirmedClose
}
} else {
if (lastWriteAcked) {
// No pending writes, close now
connection ! Close
tryShutdown()
}
// Else wait for final Ack (see handleWrite)
}
}
}