diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index d5b3b20bc8..08615d3cc5 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -38,6 +38,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { import system.dispatcher implicit val materializer = ActorFlowMaterializer() + val testConf2: Config = + ConfigFactory.parseString("akka.stream.materializer.subscription-timeout.timeout = 1 s") + .withFallback(testConf) + val system2 = ActorSystem(getClass.getSimpleName, testConf2) + val materializer2 = ActorFlowMaterializer.create(system2) + "The low-level HTTP infrastructure" should { "properly bind a server" in { @@ -133,12 +139,6 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "log materialization errors in `bindAndHandle`" which { - val testConf2: Config = - ConfigFactory.parseString("akka.stream.materializer.subscription-timeout.timeout = 1 s") - .withFallback(testConf) - val system2 = ActorSystem(getClass.getSimpleName, testConf2) - import system2.dispatcher - val materializer2 = ActorFlowMaterializer.create(system2) "are triggered in `transform`" in Utils.assertAllStagesStopped { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() @@ -149,7 +149,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept { val (_, responseFuture) = Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2) - Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException] + try Await.result(responseFuture, 5.second).status should ===(StatusCodes.InternalServerError) + catch { + case _: StreamTcpException ⇒ + // Also fine, depends on the race between abort and 500, caused by materialization panic which + // tries to tear down everything, but the order is nondeterministic + } }(system2) Await.result(b1.unbind(), 1.second) }(materializer2) @@ -163,7 +168,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept { val (_, responseFuture) = Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2) - Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException] + try Await.result(responseFuture, 5.seconds).status should ===(StatusCodes.InternalServerError) + catch { + case _: StreamTcpException ⇒ + // Also fine, depends on the race between abort and 500, caused by materialization panic which + // tries to tear down everything, but the order is nondeterministic + } }(system2) Await.result(b1.unbind(), 1.second) }(materializer2) @@ -276,7 +286,10 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } } - override def afterAll() = system.shutdown() + override def afterAll() = { + system.shutdown() + system2.shutdown() + } class TestSetup { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index e73b44e0f2..e9b45a8594 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -8,6 +8,7 @@ import akka.io.Tcp._ import akka.stream.scaladsl.Tcp.IncomingConnection import akka.stream.scaladsl.{Flow, _} import akka.stream.testkit.TestUtils.temporaryServerAddress +import scala.util.control.NonFatal import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.{ActorFlowMaterializer, BindFailedException, StreamTcpException} @@ -392,7 +393,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- a[StreamTcpException] should be thrownBy Await.result(result, 3.seconds) - binding.map(_.unbind()).foreach(_ ⇒ system2.shutdown()) + binding.map(_.unbind()).recover {case NonFatal(_) => ()} foreach(_ ⇒ system2.shutdown()) } } @@ -523,6 +524,8 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- } + + def validateServerClientCommunication(testData: ByteString, serverConnection: ServerConnection, readProbe: TcpReadProbe,