Merge pull request #17802 from drewhk/wip-16973-handle-connection-terminated-drewhk

=str #16973: Stream TCP actor should watch IO actor and handle termiation
This commit is contained in:
drewhk 2015-06-23 11:02:21 +02:00
commit e9e133a658
2 changed files with 29 additions and 2 deletions

View file

@ -3,17 +3,18 @@
*/
package akka.stream.io
import akka.actor.{ ActorSystem, Kill }
import akka.stream.scaladsl.Tcp.OutgoingConnection
import scala.collection.immutable
import scala.concurrent.{ Future, Await }
import akka.io.Tcp._
import akka.stream.BindFailedException
import akka.stream.{ ActorFlowMaterializer, StreamTcpException, BindFailedException }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.util.ByteString
import akka.util.{ Helpers, ByteString }
import akka.stream.scaladsl.Flow
import akka.stream.testkit._
import akka.stream.testkit.Utils._
@ -380,6 +381,25 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
binding.map(_.unbind())
}
"handle when connection actor terminates unexpectedly" in {
val system2 = ActorSystem()
import system2.dispatcher
val mat2 = ActorFlowMaterializer.create(system2)
val serverAddress = temporaryServerAddress()
val binding = Tcp(system2).bindAndHandle(Flow[ByteString], serverAddress.getHostName, serverAddress.getPort)(mat2)
val result = Source.lazyEmpty[ByteString].via(Tcp(system2).outgoingConnection(serverAddress)).runFold(0)(_ + _.size)(mat2)
// Getting rid of existing connection actors by using a blunt instrument
system2.actorSelection(akka.io.Tcp(system2).getManager.path / "selectors" / "$a" / "*") ! Kill
a[StreamTcpException] should be thrownBy
Await.result(result, 3.seconds)
binding.map(_.unbind()).foreach(_ system2.shutdown())
}
}
"TCP listen stream" must {
@ -438,6 +458,10 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
}
"bind and unbind correctly" in {
if (Helpers.isWindows) {
info("On Windows unbinding is not immediate")
pending
}
val address = temporaryServerAddress()
val probe1 = TestSubscriber.manualProbe[Tcp.IncomingConnection]()
val bind = Tcp(system).bind(address.getHostName, address.getPort) // TODO getHostString in Java7

View file

@ -229,6 +229,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
commonCloseHandling
def commonCloseHandling: Receive = {
case Terminated(_) fail(new StreamTcpException("The connection actor has terminated. Stopping now."))
case Closed
tcpInputs.cancel()
tcpOutputs.complete()
@ -272,6 +273,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
private[akka] class InboundTcpStreamActor(
val connection: ActorRef, _halfClose: Boolean, _settings: ActorFlowMaterializerSettings)
extends TcpStreamActor(_settings, _halfClose) {
context.watch(connection)
connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
tcpInputs.setConnection(connection)
@ -302,6 +304,7 @@ private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[B
def waitConnection(exposedProcessor: Processor[ByteString, ByteString]): Receive = {
case Connected(remoteAddress, localAddress)
val connection = sender()
context.watch(connection)
connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
tcpOutputs.setConnection(connection)
tcpInputs.setConnection(connection)