Test coverage for stages using invokeWithFeedback #23953

This commit is contained in:
Johan Andrén 2017-12-04 16:34:16 +01:00 committed by GitHub
parent 891cf30348
commit a380d01337
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 20 deletions

View file

@ -55,7 +55,7 @@ class QueueSinkSpec extends StreamSpec {
val sub = probe.expectSubscription()
queue.pull().pipeTo(testActor)
expectNoMsg(noMsgTimeout)
expectNoMessage(noMsgTimeout)
sub.sendNext(1)
expectMsg(Some(1))
@ -69,7 +69,7 @@ class QueueSinkSpec extends StreamSpec {
val sub = probe.expectSubscription()
queue.pull().pipeTo(testActor)
expectNoMsg(noMsgTimeout)
expectNoMessage(noMsgTimeout)
sub.sendError(ex)
expectMsg(Status.Failure(ex))
@ -84,13 +84,21 @@ class QueueSinkSpec extends StreamSpec {
the[Exception] thrownBy { Await.result(queue.pull(), remainingOrDefault) } should be(ex)
}
"fail future immediately if stream already canceled" in assertAllStagesStopped {
val queue = Source.empty[Int].runWith(Sink.queue())
// race here because no way to observe that queue sink saw termination
awaitAssert({
queue.pull().failed.futureValue shouldBe a[StreamDetachedException]
})
}
"timeout future when stream cannot provide data" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue())
val sub = probe.expectSubscription()
queue.pull().pipeTo(testActor)
expectNoMsg(noMsgTimeout)
expectNoMessage(noMsgTimeout)
sub.sendNext(1)
expectMsg(Some(1))
@ -154,7 +162,7 @@ class QueueSinkSpec extends StreamSpec {
expectMsg(Some(1))
queue.pull().pipeTo(testActor)
expectNoMsg(200.millis) // element requested but buffer empty
expectNoMessage(200.millis) // element requested but buffer empty
sub.sendNext(2)
expectMsg(Some(2))

View file

@ -8,17 +8,15 @@ import akka.actor.Status
import akka.pattern.pipe
import akka.stream._
import akka.stream.impl.QueueSource
import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, TestSubscriber }
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.Utils._
import akka.testkit.TestProbe
import scala.concurrent.duration._
import scala.concurrent._
import akka.Done
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, TestSubscriber }
import akka.testkit.TestProbe
import org.scalatest.time.Span
import scala.concurrent._
import scala.concurrent.duration._
class QueueSourceSpec extends StreamSpec {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
@ -46,7 +44,7 @@ class QueueSourceSpec extends StreamSpec {
}
queue.watchCompletion().pipeTo(testActor)
expectNoMsg(pause)
expectNoMessage(pause)
sub.cancel()
expectMsg(Done)
@ -103,7 +101,7 @@ class QueueSourceSpec extends StreamSpec {
val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
queue.offer(1).pipeTo(testActor)
expectNoMsg(pause)
expectNoMessage(pause)
sub.request(1)
expectMsg(QueueOfferResult.Enqueued)
s.expectNext(1)
@ -117,13 +115,19 @@ class QueueSourceSpec extends StreamSpec {
queue.watchCompletion.pipeTo(testActor)
queue.offer(1) pipeTo testActor
expectNoMsg(pause)
expectNoMessage(pause)
sub.cancel()
expectMsgAllOf(QueueOfferResult.QueueClosed, Done)
}
"fail future immediately when stream is already cancelled" in assertAllStagesStopped {
val queue = Source.queue[Int](0, OverflowStrategy.dropHead).to(Sink.cancelled).run()
queue.watchCompletion.futureValue
queue.offer(1).failed.futureValue shouldBe a[StreamDetachedException]
}
"fail stream on buffer overflow in fail mode" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
@ -207,7 +211,7 @@ class QueueSourceSpec extends StreamSpec {
assertSuccess(queue.offer(1))
queue.offer(2) pipeTo testActor
expectNoMsg(pause)
expectNoMessage(pause)
sub.request(1)
s.expectNext(1)

View file

@ -336,8 +336,6 @@ import scala.collection.generic.CanBuildFrom
override def toString: String = "QueueSink"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
var logicCallback: AsyncCallback[Output[T]] = null
val stageLogic = new GraphStageLogic(shape) with InHandler with SinkQueueWithCancel[T] {
type Received[E] = Try[Option[E]]
@ -397,18 +395,17 @@ import scala.collection.generic.CanBuildFrom
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))
logicCallback = callback
setHandler(in, this)
// SinkQueueWithCancel impl
override def pull(): Future[Option[T]] = {
val p = Promise[Option[T]]
logicCallback.invokeWithFeedback(Pull(p))
callback.invokeWithFeedback(Pull(p))
.onFailure { case NonFatal(e) p.tryFailure(e) }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
p.future
}
override def cancel(): Unit = {
logicCallback.invoke(QueueSink.Cancel)
callback.invoke(QueueSink.Cancel)
}
}

View file

@ -5,6 +5,7 @@ package akka.stream.scaladsl
import scala.concurrent.Future
import akka.Done
import akka.annotation.DoNotInherit
import akka.stream.QueueOfferResult
import akka.stream.stage.GraphStageLogic