From 73db387100f0b1564a286a48e487780f222a8a12 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Mon, 31 Aug 2015 14:00:26 -0400 Subject: [PATCH] +str #18099 FAILED: TcpSpec --- .../src/test/scala/akka/stream/io/TcpSpec.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 297329f465..e2e33dc720 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -15,7 +15,7 @@ import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException import akka.util.{ ByteString, Helpers } import scala.collection.immutable -import scala.concurrent.Await +import scala.concurrent.{ Promise, Await } import scala.concurrent.duration._ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper { @@ -503,7 +503,11 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- "shut down properly even if some accepted connection Flows have not been subscribed to" in assertAllStagesStopped { val address = temporaryServerAddress() - val takeTwoAndDropSecond = Flow[IncomingConnection].grouped(2).take(1).map(_.head) + val firstClientConnected = Promise[Unit]() + val takeTwoAndDropSecond = Flow[IncomingConnection].map(conn ⇒ { + firstClientConnected.trySuccess() + conn + }).grouped(2).take(1).map(_.head) Tcp().bind(address.getHostName, address.getPort) .via(takeTwoAndDropSecond) .runForeach(_.flow.join(Flow[ByteString]).run()) @@ -513,6 +517,9 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- .fold(0)(_ + _.size).toMat(Sink.head)(Keep.right) val total = folder.run() + + awaitAssert(firstClientConnected.future, 2.seconds) + val rejected = folder.run() Await.result(total, 3.seconds) should ===(1000)