#23118 Fail SourceQueue completion future in postStop

This commit is contained in:
Martynas Mickevičius 2017-07-13 10:57:55 +03:00 committed by Johan Andrén
parent d4092f7852
commit 57dc2763be
3 changed files with 20 additions and 7 deletions

View file

@ -53,9 +53,13 @@ import scala.util.control.NonFatal
if (maxBuffer > 0) buffer = Buffer(maxBuffer, materializer)
initCallback(callback.invoke)
}
override def postStop(): Unit = stopCallback {
case Offer(elem, promise) promise.failure(new IllegalStateException("Stream is terminated. SourceQueue is detached"))
case _ // ignore
override def postStop(): Unit = {
val exception = new AbruptStageTerminationException(this)
completion.tryFailure(exception)
stopCallback {
case Offer(elem, promise) promise.failure(exception)
case _ // ignore
}
}
private def enqueueAndSuccess(offer: Offer[T]): Unit = {