diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 7ba9d9b184..cdb9497c10 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -55,7 +55,7 @@ class QueueSinkSpec extends StreamSpec { val sub = probe.expectSubscription() queue.pull().pipeTo(testActor) - expectNoMsg(noMsgTimeout) + expectNoMessage(noMsgTimeout) sub.sendNext(1) expectMsg(Some(1)) @@ -69,7 +69,7 @@ class QueueSinkSpec extends StreamSpec { val sub = probe.expectSubscription() queue.pull().pipeTo(testActor) - expectNoMsg(noMsgTimeout) + expectNoMessage(noMsgTimeout) sub.sendError(ex) expectMsg(Status.Failure(ex)) @@ -84,13 +84,21 @@ class QueueSinkSpec extends StreamSpec { the[Exception] thrownBy { Await.result(queue.pull(), remainingOrDefault) } should be(ex) } + "fail future immediately if stream already canceled" in assertAllStagesStopped { + val queue = Source.empty[Int].runWith(Sink.queue()) + // race here because no way to observe that queue sink saw termination + awaitAssert({ + queue.pull().failed.futureValue shouldBe a[StreamDetachedException] + }) + } + "timeout future when stream cannot provide data" in assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() val queue = Source.fromPublisher(probe).runWith(Sink.queue()) val sub = probe.expectSubscription() queue.pull().pipeTo(testActor) - expectNoMsg(noMsgTimeout) + expectNoMessage(noMsgTimeout) sub.sendNext(1) expectMsg(Some(1)) @@ -154,7 +162,7 @@ class QueueSinkSpec extends StreamSpec { expectMsg(Some(1)) queue.pull().pipeTo(testActor) - expectNoMsg(200.millis) // element requested but buffer empty + expectNoMessage(200.millis) // element requested but buffer empty sub.sendNext(2) expectMsg(Some(2)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index 8e104596d3..e6fd4f7fac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -8,17 +8,15 @@ import akka.actor.Status import akka.pattern.pipe import akka.stream._ import akka.stream.impl.QueueSource -import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, TestSubscriber } -import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.Utils._ -import akka.testkit.TestProbe -import scala.concurrent.duration._ -import scala.concurrent._ -import akka.Done -import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, TestSubscriber } +import akka.testkit.TestProbe import org.scalatest.time.Span +import scala.concurrent._ +import scala.concurrent.duration._ + class QueueSourceSpec extends StreamSpec { implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher @@ -46,7 +44,7 @@ class QueueSourceSpec extends StreamSpec { } queue.watchCompletion().pipeTo(testActor) - expectNoMsg(pause) + expectNoMessage(pause) sub.cancel() expectMsg(Done) @@ -103,7 +101,7 @@ class QueueSourceSpec extends StreamSpec { val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() val sub = s.expectSubscription queue.offer(1).pipeTo(testActor) - expectNoMsg(pause) + expectNoMessage(pause) sub.request(1) expectMsg(QueueOfferResult.Enqueued) s.expectNext(1) @@ -117,13 +115,19 @@ class QueueSourceSpec extends StreamSpec { queue.watchCompletion.pipeTo(testActor) queue.offer(1) pipeTo testActor - expectNoMsg(pause) + expectNoMessage(pause) sub.cancel() expectMsgAllOf(QueueOfferResult.QueueClosed, Done) } + "fail future immediately when stream is already cancelled" in assertAllStagesStopped { + val queue = Source.queue[Int](0, OverflowStrategy.dropHead).to(Sink.cancelled).run() + queue.watchCompletion.futureValue + queue.offer(1).failed.futureValue shouldBe a[StreamDetachedException] + } + "fail stream on buffer overflow in fail mode" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() @@ -207,7 +211,7 @@ class QueueSourceSpec extends StreamSpec { assertSuccess(queue.offer(1)) queue.offer(2) pipeTo testActor - expectNoMsg(pause) + expectNoMessage(pause) sub.request(1) s.expectNext(1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index b0e2d81ab4..7283c1417e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -336,8 +336,6 @@ import scala.collection.generic.CanBuildFrom override def toString: String = "QueueSink" override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - var logicCallback: AsyncCallback[Output[T]] = null - val stageLogic = new GraphStageLogic(shape) with InHandler with SinkQueueWithCancel[T] { type Received[E] = Try[Option[E]] @@ -397,18 +395,17 @@ import scala.collection.generic.CanBuildFrom override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None)) override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex)) - logicCallback = callback setHandler(in, this) // SinkQueueWithCancel impl override def pull(): Future[Option[T]] = { val p = Promise[Option[T]] - logicCallback.invokeWithFeedback(Pull(p)) + callback.invokeWithFeedback(Pull(p)) .onFailure { case NonFatal(e) ⇒ p.tryFailure(e) }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) p.future } override def cancel(): Unit = { - logicCallback.invoke(QueueSink.Cancel) + callback.invoke(QueueSink.Cancel) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala index 997832e024..12757a5d3c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import scala.concurrent.Future import akka.Done +import akka.annotation.DoNotInherit import akka.stream.QueueOfferResult import akka.stream.stage.GraphStageLogic