=str #16553: Fix TCP stream shutdown scenarios

This commit is contained in:
Endre Sándor Varga 2015-01-26 17:07:54 +01:00
parent ed7a81e94d
commit bec3be8155
3 changed files with 258 additions and 44 deletions

View file

@ -3,6 +3,8 @@
*/
package akka.stream.io
import akka.io.Tcp._
import akka.stream.BindFailedException
import scala.concurrent.Await
@ -70,7 +72,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper {
}
"half close the connection when output stream is closed" in {
"work when client closes write, then remote closes write" in {
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()
@ -79,58 +81,203 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper {
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
// Client can still write
tcpWriteProbe.write(testData)
serverConnection.read(5)
serverConnection.waitRead() should be(testData)
// Close client side write
tcpWriteProbe.close()
// FIXME: expect PeerClosed on server
serverConnection.expectClosed(PeerClosed)
// Server can still write
serverConnection.write(testData)
tcpReadProbe.read(5) should be(testData)
// Close server side write
serverConnection.confirmedClose()
tcpReadProbe.subscriberProbe.expectComplete()
serverConnection.expectClosed(ConfirmedClosed)
serverConnection.expectTerminated()
}
"stop reading when the input stream is cancelled" in {
val server = new Server()
"work when remote closes write, then client closes write" in {
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
tcpReadProbe.close()
// FIXME: expect PeerClosed on server
// Server can still write
serverConnection.write(testData)
tcpReadProbe.subscriberProbe.expectNoMsg(1.second)
serverConnection.read(5)
tcpReadProbe.read(5) should be(testData)
// Close server side write
serverConnection.confirmedClose()
tcpReadProbe.subscriberProbe.expectComplete()
// Client can still write
tcpWriteProbe.write(testData)
serverConnection.read(5)
serverConnection.waitRead() should be(testData)
// Close client side write
tcpWriteProbe.close()
serverConnection.expectClosed(ConfirmedClosed)
serverConnection.expectTerminated()
}
"keep write side open when remote half-closes" in {
val server = new Server()
"work when client closes read, then client closes write" in {
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
// FIXME: here (and above tests) add a chitChat() method ensuring this works even after prior communication
// there should be a chitchat and non-chitchat version
// Server can still write
serverConnection.write(testData)
tcpReadProbe.read(5) should be(testData)
// Close client side read
tcpReadProbe.tcpReadSubscription.cancel()
// Client can still write
tcpWriteProbe.write(testData)
serverConnection.read(5)
serverConnection.waitRead() should be(testData)
// Close client side write
tcpWriteProbe.close()
// Need a write on the server side to detect the close event
serverConnection.write(testData)
serverConnection.write(testData)
serverConnection.expectClosed(_.isErrorClosed)
serverConnection.expectTerminated()
}
"work when client closes write, then client closes read" in {
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
// Client can still write
tcpWriteProbe.write(testData)
serverConnection.read(5)
serverConnection.waitRead() should be(testData)
// Close client side write
tcpWriteProbe.close()
serverConnection.expectClosed(PeerClosed)
// Server can still write
serverConnection.write(testData)
tcpReadProbe.read(5) should be(testData)
// Close client side read
tcpReadProbe.tcpReadSubscription.cancel()
// Need a write on the server side to detect the close event
serverConnection.write(testData)
serverConnection.write(testData)
serverConnection.expectClosed(_.isErrorClosed)
serverConnection.expectTerminated()
}
"work when client closes read, then server closes write, then client closes write" in {
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
// Server can still write
serverConnection.write(testData)
tcpReadProbe.read(5) should be(testData)
// Close client side read
tcpReadProbe.tcpReadSubscription.cancel()
// Client can still write
tcpWriteProbe.write(testData)
serverConnection.read(5)
serverConnection.waitRead() should be(testData)
serverConnection.confirmedClose()
tcpReadProbe.subscriberProbe.expectCompletedOrSubscriptionFollowedByComplete()
serverConnection.read(5)
tcpWriteProbe.write(testData)
serverConnection.waitRead() should be(testData)
// Close client side write
tcpWriteProbe.close()
// FIXME: expect closed event
serverConnection.expectClosed(ConfirmedClosed)
serverConnection.expectTerminated()
}
"shut down both streams when connection is completely closed" in {
"shut everything down if client signals error" in {
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
// Server can still write
serverConnection.write(testData)
tcpReadProbe.read(5) should be(testData)
// Client can still write
tcpWriteProbe.write(testData)
serverConnection.read(5)
serverConnection.waitRead() should be(testData)
// Cause error
tcpWriteProbe.tcpWriteSubscription.sendError(new IllegalStateException("test"))
tcpReadProbe.subscriberProbe.expectError()
serverConnection.expectClosed(_.isErrorClosed)
serverConnection.expectTerminated()
}
"shut everything down if client signals error after remote has closed write" in {
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
// Server can still write
serverConnection.write(testData)
tcpReadProbe.read(5) should be(testData)
// Close remote side write
serverConnection.confirmedClose()
tcpReadProbe.subscriberProbe.expectComplete()
// Client can still write
tcpWriteProbe.write(testData)
serverConnection.read(5)
serverConnection.waitRead() should be(testData)
tcpWriteProbe.tcpWriteSubscription.sendError(new IllegalStateException("test"))
serverConnection.expectClosed(_.isErrorClosed)
serverConnection.expectTerminated()
}
"shut down both streams when connection is aborted remotely" in {
// Client gets a PeerClosed event and does not know that the write side is also closed
val testData = ByteString(1, 2, 3, 4, 5)
val server = new Server()
@ -144,10 +291,8 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper {
serverConnection.abort()
tcpReadProbe.subscriberProbe.expectErrorOrSubscriptionFollowedByError()
tcpWriteProbe.tcpWriteSubscription.expectCancellation()
}
"close the connection when input stream and oputput streams are closed" in {
pending
serverConnection.expectTerminated()
}
"materialize correctly when used in multiple flows" in {

View file

@ -4,6 +4,7 @@
package akka.stream.io
import akka.actor.{ Actor, ActorRef, Props }
import akka.io.Tcp.{ ResumeReading, Register, ConnectionClosed, Closed }
import akka.io.{ IO, Tcp }
import akka.stream.testkit.StreamTestKit
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
@ -18,6 +19,11 @@ object TcpHelper {
case class ClientRead(count: Int, readTo: ActorRef)
case class ClientClose(cmd: Tcp.CloseCommand)
// FIXME: Workaround object just to force a ResumeReading that will poll for a possibly pending close event
// See https://github.com/akka/akka/issues/16552
// remove this and corresponding code path once above is fixed
case class PingClose(requester: ActorRef)
case object WriteAck extends Tcp.Event
def testClientProps(connection: ActorRef): Props =
@ -66,7 +72,12 @@ object TcpHelper {
toRead = 0
readTo = context.system.deadLetters
} else connection ! Tcp.ResumeReading
case PingClose(requester)
readTo = requester
connection ! ResumeReading
case c: ConnectionClosed
readTo ! c
if (!c.isPeerClosed) context.stop(self)
case ClientClose(cmd)
if (!writePending) connection ! cmd
else closeAfterWrite = Some(cmd)
@ -118,6 +129,7 @@ trait TcpHelper { this: TestKitBase ⇒
class ServerConnection(val connectionActor: ActorRef) {
val connectionProbe = TestProbe()
def write(bytes: ByteString): Unit = connectionActor ! ClientWrite(bytes)
def read(count: Int): Unit = connectionActor ! ClientRead(count, connectionProbe.ref)
@ -126,6 +138,21 @@ trait TcpHelper { this: TestKitBase ⇒
def confirmedClose(): Unit = connectionActor ! ClientClose(Tcp.ConfirmedClose)
def close(): Unit = connectionActor ! ClientClose(Tcp.Close)
def abort(): Unit = connectionActor ! ClientClose(Tcp.Abort)
def expectClosed(expected: ConnectionClosed): Unit = expectClosed(_ == expected)
def expectClosed(p: (ConnectionClosed) Boolean): Unit = {
connectionActor ! PingClose(connectionProbe.ref)
connectionProbe.fishForMessage() {
case c: ConnectionClosed if p(c) true
case other false
}
}
def expectTerminated(): Unit = {
connectionProbe.watch(connectionActor)
connectionProbe.expectTerminated(connectionActor)
}
}
class TcpReadProbe() {

View file

@ -7,7 +7,7 @@ import java.net.InetSocketAddress
import akka.io.{ IO, Tcp }
import scala.concurrent.Promise
import scala.util.control.NoStackTrace
import akka.actor.{ ActorRefFactory, Actor, Props, ActorRef, Status }
import akka.actor._
import akka.util.ByteString
import akka.io.Tcp._
import akka.stream.ActorFlowMaterializerSettings
@ -64,22 +64,17 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
def handleRead: Receive = {
case Received(data)
pendingElement = data
readPump.pump()
case Closed
closed = true
tcpOutputs.complete()
writePump.pump()
readPump.pump()
if (closed) connection ! ResumeReading
else {
pendingElement = data
readPump.pump()
}
case ConfirmedClosed
closed = true
cancelWithoutTcpClose()
readPump.pump()
case PeerClosed
closed = true
cancelWithoutTcpClose()
readPump.pump()
case ErrorClosed(cause) fail(new StreamTcpException(s"The connection closed with error $cause"))
case CommandFailed(cmd) fail(new StreamTcpException(s"Tcp command [$cmd] failed"))
case Aborted fail(new StreamTcpException("The connection has been aborted"))
}
override def inputsAvailable: Boolean = pendingElement ne null
@ -87,8 +82,23 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
override def isClosed: Boolean = closed
override def cancel(): Unit = {
closed = true
pendingElement = null
if (!closed) {
closed = true
pendingElement = null
if (connection ne null) {
if (tcpOutputs.isClosed)
connection ! Abort
else
connection ! ResumeReading
}
}
}
def cancelWithoutTcpClose(): Unit = {
if (!closed) {
closed = true
pendingElement = null
}
}
override def dequeueInputElement(): Any = {
@ -105,6 +115,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
private var pendingDemand = true
private var connection: ActorRef = _
def isClosed: Boolean = closed
private def initialized: Boolean = connection ne null
def setConnection(c: ActorRef): Unit = {
@ -122,14 +133,19 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
}
override def isClosed: Boolean = closed
override def cancel(e: Throwable): Unit = {
if (!closed && initialized) connection ! Abort
closed = true
}
override def complete(): Unit = {
if (!closed && initialized) connection ! ConfirmedClose
closed = true
if (!closed && initialized) {
closed = true
if (tcpInputs.isClosed)
connection ! Close
else
connection ! ConfirmedClose
}
}
override def enqueueOutputElement(elem: Any): Unit = {
connection ! Write(elem.asInstanceOf[ByteString], WriteAck)
@ -149,6 +165,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
override protected def pumpFinished(): Unit = {
tcpOutputs.complete()
primaryInputs.cancel()
tryShutdown()
}
override protected def pumpFailed(e: Throwable): Unit = fail(e)
@ -176,7 +193,22 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
}
def activeReceive =
primaryInputs.subreceive orElse primaryOutputs.subreceive orElse tcpInputs.subreceive orElse tcpOutputs.subreceive
primaryInputs.subreceive orElse
primaryOutputs.subreceive orElse
tcpInputs.subreceive orElse
tcpOutputs.subreceive orElse
commonCloseHandling
def commonCloseHandling: Receive = {
case Closed
tcpInputs.cancel()
tcpOutputs.complete()
writePump.pump()
readPump.pump()
case ErrorClosed(cause) fail(new StreamTcpException(s"The connection closed with error $cause"))
case CommandFailed(cmd) fail(new StreamTcpException(s"Tcp command [$cmd] failed"))
case Aborted fail(new StreamTcpException("The connection has been aborted"))
}
readPump.nextPhase(readPump.running)
writePump.nextPhase(writePump.running)
@ -190,8 +222,18 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
primaryOutputs.cancel(e)
}
def tryShutdown(): Unit = if (primaryInputs.isClosed && tcpInputs.isClosed && tcpOutputs.isClosed) context.stop(self)
def tryShutdown(): Unit =
if (primaryInputs.isClosed && tcpInputs.isClosed && tcpOutputs.isClosed)
context.stop(self)
override def postStop(): Unit = {
// Close if it has not yet been done
tcpInputs.cancel()
tcpOutputs.complete()
primaryInputs.cancel()
primaryOutputs.complete()
super.postStop() // Remember, we have a Stash
}
}
/**