From e3dadd981fb64872dfeefd3b2e7c212d6a340ef7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 17 Sep 2020 08:37:02 +0200 Subject: [PATCH] Handle SubSource already timed out on failure or completion #29095 (#29605) --- .../main/scala/akka/stream/impl/fusing/StreamOfStreams.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 32afa92aca..441a8f160c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -11,7 +11,6 @@ import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal - import akka.NotUsed import akka.annotation.InternalApi import akka.stream._ @@ -19,6 +18,7 @@ import akka.stream.ActorAttributes.StreamSubscriptionTimeout import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.impl.{ Buffer => BufferImpl } import akka.stream.impl.ActorSubscriberMessage +import akka.stream.impl.ActorSubscriberMessage.OnError import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.TraversalBuilder @@ -774,6 +774,7 @@ import akka.util.ccompat.JavaConverters._ case null => if (!status.compareAndSet(null, ActorSubscriberMessage.OnComplete)) status.get.asInstanceOf[AsyncCallback[Any]].invoke(ActorSubscriberMessage.OnComplete) + case OnError(_: SubscriptionTimeoutException) => // already timed out, keep the timeout as that happened first } def failSubstream(ex: Throwable): Unit = status.get match { @@ -782,6 +783,7 @@ import akka.util.ccompat.JavaConverters._ val failure = ActorSubscriberMessage.OnError(ex) if (!status.compareAndSet(null, failure)) status.get.asInstanceOf[AsyncCallback[Any]].invoke(failure) + case OnError(_: SubscriptionTimeoutException) => // already timed out, keep the timeout as that happened first } def timeout(d: FiniteDuration): Boolean =