From 42f72324469e37dda005614fa3e4d29094094087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Mon, 22 Jun 2015 15:20:23 +0300 Subject: [PATCH] =str #17172 make assertions on sync zip more robust --- .../src/test/scala/akka/stream/io/TcpSpec.scala | 4 ++-- .../scaladsl/GraphJunctionAttributesSpec.scala | 14 ++++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 4011f5f73c..15bf839726 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -415,8 +415,8 @@ class TcpSpec extends AkkaSpec with TcpHelper { val binding3F = bind.to(Sink(probe3)).run() probe3.expectSubscriptionAndError() - an[BindFailedException] shouldBe thrownBy { Await.result(binding2F, 1.second) } - an[BindFailedException] shouldBe thrownBy { Await.result(binding3F, 1.second) } + a[BindFailedException] shouldBe thrownBy { Await.result(binding2F, 1.second) } + a[BindFailedException] shouldBe thrownBy { Await.result(binding3F, 1.second) } // Now unbind first Await.result(binding1.unbind(), 1.second) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala index cd769bf5c6..267fa95289 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala @@ -28,7 +28,7 @@ class GraphJunctionAttributesSpec extends AkkaSpec { val source = Source[(SlowTick, List[FastTick])]() { implicit b ⇒ import FlowGraph.Implicits._ - val slow = Source(0.seconds, 100.millis, SlowTick) + val slow = Source(100.millis, 100.millis, SlowTick) val fast = Source(0.seconds, 10.millis, FastTick) val zip = b add Zip[SlowTick, List[FastTick]]().withAttributes(inputBuffer(1, 1)) @@ -39,10 +39,16 @@ class GraphJunctionAttributesSpec extends AkkaSpec { zip.out } - val future = source.grouped(10).runWith(Sink.head) + val future = source + .drop(1) // account for prefetch + .grouped(10) + .runWith(Sink.head) + val fastTicks = Await.result(future, 2.seconds).map(_._2.size) - // FIXME #16435 drop(2) needed because first two SlowTicks get only one FastTick - Await.result(future, 2.seconds).map(_._2.size).filter(_ == 1).drop(2) should be(Nil) + // Account for the possibility for the zip to act as a buffer of two. + // If that happens there would be one fast tick for one slow tick in the results. + // More explanation in #16435 + atLeast(8, fastTicks) shouldBe 10 +- 1 } }