diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 606ae9337f..3e7173b091 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -3,17 +3,18 @@ */ package akka.stream.io +import akka.actor.{ ActorSystem, Kill } import akka.stream.scaladsl.Tcp.OutgoingConnection import scala.collection.immutable import scala.concurrent.{ Future, Await } import akka.io.Tcp._ -import akka.stream.BindFailedException +import akka.stream.{ ActorFlowMaterializer, StreamTcpException, BindFailedException } import scala.concurrent.Await import scala.concurrent.duration._ -import akka.util.ByteString +import akka.util.{ Helpers, ByteString } import akka.stream.scaladsl.Flow import akka.stream.testkit._ import akka.stream.testkit.Utils._ @@ -380,6 +381,25 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- binding.map(_.unbind()) } + "handle when connection actor terminates unexpectedly" in { + val system2 = ActorSystem() + import system2.dispatcher + val mat2 = ActorFlowMaterializer.create(system2) + + val serverAddress = temporaryServerAddress() + val binding = Tcp(system2).bindAndHandle(Flow[ByteString], serverAddress.getHostName, serverAddress.getPort)(mat2) + + val result = Source.lazyEmpty[ByteString].via(Tcp(system2).outgoingConnection(serverAddress)).runFold(0)(_ + _.size)(mat2) + + // Getting rid of existing connection actors by using a blunt instrument + system2.actorSelection(akka.io.Tcp(system2).getManager.path / "selectors" / "$a" / "*") ! Kill + + a[StreamTcpException] should be thrownBy + Await.result(result, 3.seconds) + + binding.map(_.unbind()).foreach(_ ⇒ system2.shutdown()) + } + } "TCP listen stream" must { @@ -438,6 +458,10 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- } "bind and unbind correctly" in { + if (Helpers.isWindows) { + info("On Windows unbinding is not immediate") + pending + } val address = temporaryServerAddress() val probe1 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() val bind = Tcp(system).bind(address.getHostName, address.getPort) // TODO getHostString in Java7 diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 3e5465c784..bf18bbe02b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -229,6 +229,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS commonCloseHandling def commonCloseHandling: Receive = { + case Terminated(_) ⇒ fail(new StreamTcpException("The connection actor has terminated. Stopping now.")) case Closed ⇒ tcpInputs.cancel() tcpOutputs.complete() @@ -272,6 +273,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS private[akka] class InboundTcpStreamActor( val connection: ActorRef, _halfClose: Boolean, _settings: ActorFlowMaterializerSettings) extends TcpStreamActor(_settings, _halfClose) { + context.watch(connection) connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) tcpInputs.setConnection(connection) @@ -302,6 +304,7 @@ private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[B def waitConnection(exposedProcessor: Processor[ByteString, ByteString]): Receive = { case Connected(remoteAddress, localAddress) ⇒ val connection = sender() + context.watch(connection) connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) tcpOutputs.setConnection(connection) tcpInputs.setConnection(connection)