Merge pull request #18369 from agolubev/agolubev-#18099-FAILED-TcpSpec

+str #18099 FAILED: TcpSpec
This commit is contained in:
drewhk 2015-09-02 11:34:40 +02:00
commit 36b4ae0efc

View file

@ -15,7 +15,7 @@ import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException
import akka.util.{ ByteString, Helpers }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.{ Promise, Await }
import scala.concurrent.duration._
class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper {
@ -503,7 +503,11 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
"shut down properly even if some accepted connection Flows have not been subscribed to" in assertAllStagesStopped {
val address = temporaryServerAddress()
val takeTwoAndDropSecond = Flow[IncomingConnection].grouped(2).take(1).map(_.head)
val firstClientConnected = Promise[Unit]()
val takeTwoAndDropSecond = Flow[IncomingConnection].map(conn {
firstClientConnected.trySuccess()
conn
}).grouped(2).take(1).map(_.head)
Tcp().bind(address.getHostName, address.getPort)
.via(takeTwoAndDropSecond)
.runForeach(_.flow.join(Flow[ByteString]).run())
@ -513,6 +517,9 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
.fold(0)(_ + _.size).toMat(Sink.head)(Keep.right)
val total = folder.run()
awaitAssert(firstClientConnected.future, 2.seconds)
val rejected = folder.run()
Await.result(total, 3.seconds) should ===(1000)