Merge pull request #19138 from akka/wip-18988-stabilize-AcknowledgeSinkSpec-√
=str - 18988 - Make AcknowledgeSinkSpec more robust
This commit is contained in:
commit
cdbec3bb55
1 changed files with 8 additions and 15 deletions
|
|
@ -21,18 +21,15 @@ class AcknowledgeSinkSpec extends AkkaSpec {
|
|||
|
||||
val noMsgTimeout = 300.millis
|
||||
|
||||
def assertSuccess(value: Any, fb: Future[Option[Any]]): Unit =
|
||||
Await.result(fb, 1.second) should be(Some(value))
|
||||
|
||||
"An AcknowledgeSink" must {
|
||||
|
||||
"send the elements as result of future" in assertAllStagesStopped {
|
||||
val queue = Source(List(1, 2, 3)).runWith(Sink.queue(3))
|
||||
assertSuccess(1, queue.pull())
|
||||
assertSuccess(2, queue.pull())
|
||||
assertSuccess(3, queue.pull())
|
||||
queue.pull().pipeTo(testActor)
|
||||
expectMsg(None)
|
||||
val expected = List(Some(1), Some(2), Some(3), None)
|
||||
val queue = Source(expected.flatten).runWith(Sink.queue(3))
|
||||
expected foreach { v ⇒
|
||||
queue.pull() pipeTo testActor
|
||||
expectMsg(v)
|
||||
}
|
||||
}
|
||||
|
||||
"allow to have only one future waiting for result in each point of time" in assertAllStagesStopped {
|
||||
|
|
@ -77,15 +74,11 @@ class AcknowledgeSinkSpec extends AkkaSpec {
|
|||
|
||||
"fail future when stream failed" in assertAllStagesStopped {
|
||||
val probe = TestPublisher.manualProbe[Int]()
|
||||
val queue = Source(probe).runWith(Sink.queue(3, 100.milli))
|
||||
val queue = Source(probe).runWith(Sink.queue(3, 100.millis))
|
||||
val sub = probe.expectSubscription()
|
||||
sub.sendError(ex) // potential race condition
|
||||
|
||||
val future = queue.pull()
|
||||
future.onFailure { case e ⇒ e.getClass() should be(classOf[AskTimeoutException]); Unit }
|
||||
future.onSuccess { case _ ⇒ fail() }
|
||||
|
||||
Await.ready(future, 1.second)
|
||||
an[AskTimeoutException] shouldBe thrownBy { Await.result(queue.pull(), 1.second) }
|
||||
}
|
||||
|
||||
"timeout future when stream cannot provide data" in assertAllStagesStopped {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue