diff --git a/akka-docs/.history b/akka-docs/.history deleted file mode 100644 index a3abe50906..0000000000 --- a/akka-docs/.history +++ /dev/null @@ -1 +0,0 @@ -exit diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index 2687ef82cc..5eea065e34 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -3,17 +3,18 @@ */ package akka.stream.scaladsl +import akka.Done import akka.actor.Status import akka.pattern.pipe import akka.stream._ import akka.stream.impl.QueueSource +import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSourceStage, TestSubscriber } +import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.Utils._ import akka.testkit.TestProbe + import scala.concurrent.duration._ import scala.concurrent._ -import akka.Done -import akka.stream.testkit.{ StreamSpec, TestSubscriber, TestSourceStage, GraphStageMessages } -import akka.stream.testkit.scaladsl.TestSink import org.scalatest.time.Span class QueueSourceSpec extends StreamSpec { @@ -174,6 +175,15 @@ class QueueSourceSpec extends StreamSpec { expectMsgClass(classOf[Status.Failure]) } + "complete watching future with failure if materializer shut down" in assertAllStagesStopped { + val tempMap = ActorMaterializer() + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()(tempMap) + queue.watchCompletion().pipeTo(testActor) + tempMap.shutdown() + expectMsgClass(classOf[Status.Failure]) + } + "return false when elemen was not added to buffer" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink.fromSubscriber(s)).run() diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index db6e986d45..11ababdf20 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -53,9 +53,13 @@ import scala.util.control.NonFatal if (maxBuffer > 0) buffer = Buffer(maxBuffer, materializer) initCallback(callback.invoke) } - override def postStop(): Unit = stopCallback { - case Offer(elem, promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. SourceQueue is detached")) - case _ ⇒ // ignore + override def postStop(): Unit = { + val exception = new AbruptStageTerminationException(this) + completion.tryFailure(exception) + stopCallback { + case Offer(elem, promise) ⇒ promise.failure(exception) + case _ ⇒ // ignore + } } private def enqueueAndSuccess(offer: Offer[T]): Unit = {