Handle SubSource already timed out on failure or completion #29095 (#29605)

This commit is contained in:
Johan Andrén 2020-09-17 08:37:02 +02:00 committed by GitHub
parent 5ea4c1751c
commit e3dadd981f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -11,7 +11,6 @@ import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream._
@ -19,6 +18,7 @@ import akka.stream.ActorAttributes.StreamSubscriptionTimeout
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.impl.{ Buffer => BufferImpl }
import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.ActorSubscriberMessage.OnError
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.impl.TraversalBuilder
@ -774,6 +774,7 @@ import akka.util.ccompat.JavaConverters._
case null =>
if (!status.compareAndSet(null, ActorSubscriberMessage.OnComplete))
status.get.asInstanceOf[AsyncCallback[Any]].invoke(ActorSubscriberMessage.OnComplete)
case OnError(_: SubscriptionTimeoutException) => // already timed out, keep the timeout as that happened first
}
def failSubstream(ex: Throwable): Unit = status.get match {
@ -782,6 +783,7 @@ import akka.util.ccompat.JavaConverters._
val failure = ActorSubscriberMessage.OnError(ex)
if (!status.compareAndSet(null, failure))
status.get.asInstanceOf[AsyncCallback[Any]].invoke(failure)
case OnError(_: SubscriptionTimeoutException) => // already timed out, keep the timeout as that happened first
}
def timeout(d: FiniteDuration): Boolean =