From eff6c409d11623b85018b9c8ca142c04fc769c5c Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Thu, 10 Aug 2017 20:20:55 -0400 Subject: [PATCH] +str fix for "fail offer future when stream is completed" QueueSource/SinkSpec --- .../scala/akka/stream/scaladsl/QueueSinkSpec.scala | 5 +++-- .../akka/stream/scaladsl/QueueSourceSpec.scala | 2 +- .../akka/stream/StreamDetachedException.scala | 14 ++++++++++++++ .../main/scala/akka/stream/impl/QueueSource.scala | 4 ++-- .../src/main/scala/akka/stream/impl/Sinks.scala | 2 +- .../main/scala/akka/stream/scaladsl/Queue.scala | 3 +++ 6 files changed, 24 insertions(+), 6 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/StreamDetachedException.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index f12188713d..7ba9d9b184 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -6,9 +6,10 @@ package akka.stream.scaladsl import akka.actor.Status import akka.pattern.pipe import akka.stream.Attributes.inputBuffer -import akka.stream.ActorMaterializer +import akka.stream.{ ActorMaterializer, StreamDetachedException } import akka.stream.testkit.Utils._ import akka.stream.testkit._ + import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -109,7 +110,7 @@ class QueueSinkSpec extends StreamSpec { sub.sendComplete() Await.result(queue.pull(), noMsgTimeout) should be(None) - queue.pull().failed.foreach { e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } + queue.pull().failed.futureValue shouldBe an[StreamDetachedException] } "keep on sending even after the buffer has been full" in assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index 5eea065e34..057a8e3d98 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -225,7 +225,7 @@ class QueueSourceSpec extends StreamSpec { sub.cancel() expectMsg(Done) - queue.offer(1).failed.foreach { e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } + queue.offer(1).failed.futureValue shouldBe an[StreamDetachedException] } "not share future across materializations" in { diff --git a/akka-stream/src/main/scala/akka/stream/StreamDetachedException.scala b/akka-stream/src/main/scala/akka/stream/StreamDetachedException.scala new file mode 100644 index 0000000000..73888b1882 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/StreamDetachedException.scala @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream + +import scala.util.control.NoStackTrace + +/** + * This exception signals that materialized value is already detached from stream. This usually happens + * when stream is completed and an ActorSystem is shut down while materialized object is still available. + */ +final class StreamDetachedException + extends RuntimeException("Stream is terminated. Materialized value is detached.") + with NoStackTrace diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index 25be1c8295..4c9fefa331 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -7,9 +7,9 @@ import akka.stream.OverflowStrategies._ import akka.stream._ import akka.stream.stage._ import akka.stream.scaladsl.SourceQueueWithComplete - import akka.Done import java.util.concurrent.CompletionStage + import akka.annotation.InternalApi import scala.concurrent.{ Future, Promise } @@ -46,7 +46,7 @@ import scala.compat.java8.FutureConverters._ initCallback(callback.invoke) } override def postStop(): Unit = { - val exception = new AbruptStageTerminationException(this) + val exception = new StreamDetachedException() completion.tryFailure(exception) stopCallback { case Offer(elem, promise) ⇒ promise.failure(exception) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 2cf8de2a6b..d40fdde007 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -353,7 +353,7 @@ import akka.util.OptionVal } override def postStop(): Unit = stopCallback { - case Pull(promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached")) + case Pull(promise) ⇒ promise.failure(new StreamDetachedException()) case _ ⇒ //do nothing } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala index 5f0b6ec087..997832e024 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala @@ -6,6 +6,9 @@ package akka.stream.scaladsl import scala.concurrent.Future import akka.Done import akka.stream.QueueOfferResult +import akka.stream.stage.GraphStageLogic + +import scala.util.control.NoStackTrace /** * This trait allows to have the queue as a data source for some stream.