diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 297329f465..e2e33dc720 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -15,7 +15,7 @@ import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException import akka.util.{ ByteString, Helpers } import scala.collection.immutable -import scala.concurrent.Await +import scala.concurrent.{ Promise, Await } import scala.concurrent.duration._ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper { @@ -503,7 +503,11 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- "shut down properly even if some accepted connection Flows have not been subscribed to" in assertAllStagesStopped { val address = temporaryServerAddress() - val takeTwoAndDropSecond = Flow[IncomingConnection].grouped(2).take(1).map(_.head) + val firstClientConnected = Promise[Unit]() + val takeTwoAndDropSecond = Flow[IncomingConnection].map(conn ⇒ { + firstClientConnected.trySuccess() + conn + }).grouped(2).take(1).map(_.head) Tcp().bind(address.getHostName, address.getPort) .via(takeTwoAndDropSecond) .runForeach(_.flow.join(Flow[ByteString]).run()) @@ -513,6 +517,9 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- .fold(0)(_ + _.size).toMat(Sink.head)(Keep.right) val total = folder.run() + + awaitAssert(firstClientConnected.future, 2.seconds) + val rejected = folder.run() Await.result(total, 3.seconds) should ===(1000)