Merge pull request #17800 from 2m/wip-sync-zip

=str #17172 make assertions on sync zip more robust
This commit is contained in:
Konrad Malawski 2015-06-22 17:47:58 +02:00
commit e7027b3974
2 changed files with 12 additions and 6 deletions

View file

@ -454,8 +454,8 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
val binding3F = bind.to(Sink(probe3)).run()
probe3.expectSubscriptionAndError()
an[BindFailedException] shouldBe thrownBy { Await.result(binding2F, 1.second) }
an[BindFailedException] shouldBe thrownBy { Await.result(binding3F, 1.second) }
a[BindFailedException] shouldBe thrownBy { Await.result(binding2F, 1.second) }
a[BindFailedException] shouldBe thrownBy { Await.result(binding3F, 1.second) }
// Now unbind first
Await.result(binding1.unbind(), 1.second)

View file

@ -28,7 +28,7 @@ class GraphJunctionAttributesSpec extends AkkaSpec {
val source = Source[(SlowTick, List[FastTick])]() { implicit b
import FlowGraph.Implicits._
val slow = Source(0.seconds, 100.millis, SlowTick)
val slow = Source(100.millis, 100.millis, SlowTick)
val fast = Source(0.seconds, 10.millis, FastTick)
val zip = b add Zip[SlowTick, List[FastTick]]().withAttributes(inputBuffer(1, 1))
@ -39,10 +39,16 @@ class GraphJunctionAttributesSpec extends AkkaSpec {
zip.out
}
val future = source.grouped(10).runWith(Sink.head)
val future = source
.drop(1) // account for prefetch
.grouped(10)
.runWith(Sink.head)
val fastTicks = Await.result(future, 2.seconds).map(_._2.size)
// FIXME #16435 drop(2) needed because first two SlowTicks get only one FastTick
Await.result(future, 2.seconds).map(_._2.size).filter(_ == 1).drop(2) should be(Nil)
// Account for the possibility for the zip to act as a buffer of two.
// If that happens there would be one fast tick for one slow tick in the results.
// More explanation in #16435
atLeast(8, fastTicks) shouldBe 10 +- 1
}
}