diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala index 0fc72d6f04..1b32bdceec 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala @@ -21,18 +21,15 @@ class AcknowledgeSinkSpec extends AkkaSpec { val noMsgTimeout = 300.millis - def assertSuccess(value: Any, fb: Future[Option[Any]]): Unit = - Await.result(fb, 1.second) should be(Some(value)) - "An AcknowledgeSink" must { "send the elements as result of future" in assertAllStagesStopped { - val queue = Source(List(1, 2, 3)).runWith(Sink.queue(3)) - assertSuccess(1, queue.pull()) - assertSuccess(2, queue.pull()) - assertSuccess(3, queue.pull()) - queue.pull().pipeTo(testActor) - expectMsg(None) + val expected = List(Some(1), Some(2), Some(3), None) + val queue = Source(expected.flatten).runWith(Sink.queue(3)) + expected foreach { v ⇒ + queue.pull() pipeTo testActor + expectMsg(v) + } } "allow to have only one future waiting for result in each point of time" in assertAllStagesStopped { @@ -77,15 +74,11 @@ class AcknowledgeSinkSpec extends AkkaSpec { "fail future when stream failed" in assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() - val queue = Source(probe).runWith(Sink.queue(3, 100.milli)) + val queue = Source(probe).runWith(Sink.queue(3, 100.millis)) val sub = probe.expectSubscription() sub.sendError(ex) // potential race condition - val future = queue.pull() - future.onFailure { case e ⇒ e.getClass() should be(classOf[AskTimeoutException]); Unit } - future.onSuccess { case _ ⇒ fail() } - - Await.ready(future, 1.second) + an[AskTimeoutException] shouldBe thrownBy { Await.result(queue.pull(), 1.second) } } "timeout future when stream cannot provide data" in assertAllStagesStopped {