+str fix for "fail offer future when stream is completed" QueueSource/SinkSpec
This commit is contained in:
parent
bb106aa27e
commit
eff6c409d1
6 changed files with 24 additions and 6 deletions
|
|
@ -6,9 +6,10 @@ package akka.stream.scaladsl
|
||||||
import akka.actor.Status
|
import akka.actor.Status
|
||||||
import akka.pattern.pipe
|
import akka.pattern.pipe
|
||||||
import akka.stream.Attributes.inputBuffer
|
import akka.stream.Attributes.inputBuffer
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.{ ActorMaterializer, StreamDetachedException }
|
||||||
import akka.stream.testkit.Utils._
|
import akka.stream.testkit.Utils._
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
|
|
@ -109,7 +110,7 @@ class QueueSinkSpec extends StreamSpec {
|
||||||
sub.sendComplete()
|
sub.sendComplete()
|
||||||
Await.result(queue.pull(), noMsgTimeout) should be(None)
|
Await.result(queue.pull(), noMsgTimeout) should be(None)
|
||||||
|
|
||||||
queue.pull().failed.foreach { e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) }
|
queue.pull().failed.futureValue shouldBe an[StreamDetachedException]
|
||||||
}
|
}
|
||||||
|
|
||||||
"keep on sending even after the buffer has been full" in assertAllStagesStopped {
|
"keep on sending even after the buffer has been full" in assertAllStagesStopped {
|
||||||
|
|
|
||||||
|
|
@ -225,7 +225,7 @@ class QueueSourceSpec extends StreamSpec {
|
||||||
sub.cancel()
|
sub.cancel()
|
||||||
expectMsg(Done)
|
expectMsg(Done)
|
||||||
|
|
||||||
queue.offer(1).failed.foreach { e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) }
|
queue.offer(1).failed.futureValue shouldBe an[StreamDetachedException]
|
||||||
}
|
}
|
||||||
|
|
||||||
"not share future across materializations" in {
|
"not share future across materializations" in {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.stream
|
||||||
|
|
||||||
|
import scala.util.control.NoStackTrace
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception signals that materialized value is already detached from stream. This usually happens
|
||||||
|
* when stream is completed and an ActorSystem is shut down while materialized object is still available.
|
||||||
|
*/
|
||||||
|
final class StreamDetachedException
|
||||||
|
extends RuntimeException("Stream is terminated. Materialized value is detached.")
|
||||||
|
with NoStackTrace
|
||||||
|
|
@ -7,9 +7,9 @@ import akka.stream.OverflowStrategies._
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
import akka.stream.stage._
|
import akka.stream.stage._
|
||||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||||
|
|
||||||
import akka.Done
|
import akka.Done
|
||||||
import java.util.concurrent.CompletionStage
|
import java.util.concurrent.CompletionStage
|
||||||
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
|
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
|
|
@ -46,7 +46,7 @@ import scala.compat.java8.FutureConverters._
|
||||||
initCallback(callback.invoke)
|
initCallback(callback.invoke)
|
||||||
}
|
}
|
||||||
override def postStop(): Unit = {
|
override def postStop(): Unit = {
|
||||||
val exception = new AbruptStageTerminationException(this)
|
val exception = new StreamDetachedException()
|
||||||
completion.tryFailure(exception)
|
completion.tryFailure(exception)
|
||||||
stopCallback {
|
stopCallback {
|
||||||
case Offer(elem, promise) ⇒ promise.failure(exception)
|
case Offer(elem, promise) ⇒ promise.failure(exception)
|
||||||
|
|
|
||||||
|
|
@ -353,7 +353,7 @@ import akka.util.OptionVal
|
||||||
}
|
}
|
||||||
|
|
||||||
override def postStop(): Unit = stopCallback {
|
override def postStop(): Unit = stopCallback {
|
||||||
case Pull(promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached"))
|
case Pull(promise) ⇒ promise.failure(new StreamDetachedException())
|
||||||
case _ ⇒ //do nothing
|
case _ ⇒ //do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,9 @@ package akka.stream.scaladsl
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import akka.Done
|
import akka.Done
|
||||||
import akka.stream.QueueOfferResult
|
import akka.stream.QueueOfferResult
|
||||||
|
import akka.stream.stage.GraphStageLogic
|
||||||
|
|
||||||
|
import scala.util.control.NoStackTrace
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This trait allows to have the queue as a data source for some stream.
|
* This trait allows to have the queue as a data source for some stream.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue