diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index e19cd939ed..aae2b94051 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -164,15 +164,19 @@ object StreamRefsSpec { actor { provider = remote serialize-messages = off + + default-mailbox.mailbox-type = "akka.dispatch.UnboundedMailbox" } - remote.netty.tcp { - port = ${address.getPort} - hostname = "${address.getHostName}" - } - - stream.materializer.stream-ref { - subscription-timeout = 3 seconds + remote { + artery { + enabled = on + transport = aeron-udp + canonical.hostname = "${address.getHostName}" + canonical.port = ${address.getPort} + log-received-messages = on + log-sent-messages = on + } } } """).withFallback(ConfigFactory.load()) @@ -283,7 +287,7 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende // the local "remote sink" should cancel, since it should notice the origin target actor is dead probe.ensureSubscription() val ex = probe.expectError() - ex.getMessage should include("has terminated! Tearing down this side of the stream as well.") + ex.getMessage should include("has terminated unexpectedly ") } // bug #24626 diff --git a/akka-stream/src/main/mima-filters/2.5.16.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.16.backwards.excludes new file mode 100644 index 0000000000..da2e35d88e --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.16.backwards.excludes @@ -0,0 +1,4 @@ +# #25469 stream refs hardened termination via awaiting completion signal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl.*") +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl$") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.StreamRefSettings.*") diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 2be6ac95f5..a29e83a9f7 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -109,6 +109,15 @@ akka { # prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking # in-active streams which are never subscribed to. subscription-timeout = 30 seconds + + # In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed + # message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it. + # This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the + # other side of the stream ref would never send the "final" terminal message. + # + # The timeout specifically means the time between the Terminated signal being received and when the local SourceRef + # determines to fail itself, assuming there was message loss or a complete partition of the completion signal. + final-termination-signal-deadline = 2 seconds } //#stream-ref } diff --git a/akka-stream/src/main/scala/akka/stream/StreamRefSettings.scala b/akka-stream/src/main/scala/akka/stream/StreamRefSettings.scala index d2633b46d2..5789497b39 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamRefSettings.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamRefSettings.scala @@ -29,7 +29,8 @@ object StreamRefSettings { StreamRefSettingsImpl( bufferCapacity = c.getInt("buffer-capacity"), demandRedeliveryInterval = c.getDuration("demand-redelivery-interval", TimeUnit.MILLISECONDS).millis, - subscriptionTimeout = c.getDuration("subscription-timeout", TimeUnit.MILLISECONDS).millis + subscriptionTimeout = c.getDuration("subscription-timeout", TimeUnit.MILLISECONDS).millis, + finalTerminationSignalDeadline = c.getDuration("final-termination-signal-deadline", TimeUnit.MILLISECONDS).millis ) } } @@ -43,11 +44,13 @@ trait StreamRefSettings { def bufferCapacity: Int def demandRedeliveryInterval: FiniteDuration def subscriptionTimeout: FiniteDuration + def finalTerminationSignalDeadline: FiniteDuration // --- with... methods --- def withBufferCapacity(value: Int): StreamRefSettings def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings + def withTerminationReceivedBeforeCompletionLeeway(value: FiniteDuration): StreamRefSettings } diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index 5cd1d96b3f..8c44270d42 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -15,7 +15,7 @@ import akka.stream.stage._ import akka.util.{ OptionVal, PrettyDuration } import scala.concurrent.{ Future, Promise } -import scala.util.Try +import scala.util.{ Failure, Success, Try } /** INTERNAL API: Implementation class, not intended to be touched directly by end-users */ @InternalApi @@ -79,6 +79,11 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( private var completedBeforeRemoteConnected: OptionVal[Try[Done]] = OptionVal.None + // Some when this side of the stream has completed/failed, and we await the Terminated() signal back from the partner + // so we can safely shut down completely; This is to avoid *our* Terminated() signal to reach the partner before the + // Complete/Fail message does, which can happen on transports such as Artery which use a dedicated lane for system messages (Terminated) + private[this] var finishedWithAwaitingPartnerTermination: OptionVal[Try[Done]] = OptionVal.None + override def preStart(): Unit = { self = getStageActor(initialReceive) @@ -102,8 +107,15 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( lazy val initialReceive: ((ActorRef, Any)) ⇒ Unit = { case (_, Terminated(ref)) ⇒ if (ref == getPartnerRef) - failStage(RemoteStreamRefActorTerminatedException(s"Remote target receiver of data $partnerRef terminated. " + - s"Local stream terminating, message loss (on remote side) may have happened.")) + finishedWithAwaitingPartnerTermination match { + case OptionVal.Some(Failure(ex)) ⇒ + failStage(ex) + case OptionVal.Some(_ /* known to be Success*/ ) ⇒ + completeStage() // other side has terminated (in response to a completion message) so we can safely terminate + case OptionVal.None ⇒ + failStage(RemoteStreamRefActorTerminatedException(s"Remote target receiver of data $partnerRef terminated. " + + s"Local stream terminating, message loss (on remote side) may have happened.")) + } case (sender, StreamRefsProtocol.CumulativeDemand(d)) ⇒ // the other side may attempt to "double subscribe", which we want to fail eagerly since we're 1:1 pairings @@ -124,7 +136,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( tryPull() } - private def tryPull() = + private def tryPull(): Unit = if (remoteCumulativeDemandConsumed < remoteCumulativeDemandReceived && !hasBeenPulled(in)) { pull(in) } @@ -136,7 +148,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}], " + s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") - throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud" + throw ex } private def grabSequenced[T](in: Inlet[T]): StreamRefsProtocol.SequencedOnNext[T] = { @@ -145,12 +157,12 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( onNext } - override def onUpstreamFailure(ex: Throwable): Unit = + override def onUpstreamFailure(ex: Throwable): Unit = { partnerRef match { case OptionVal.Some(ref) ⇒ ref ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) - self.unwatch(getPartnerRef) - super.onUpstreamFailure(ex) + finishedWithAwaitingPartnerTermination = OptionVal(Failure(ex)) + setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message) case _ ⇒ completedBeforeRemoteConnected = OptionVal(scala.util.Failure(ex)) @@ -158,13 +170,14 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( // the stage will be terminated either by timeout, or by the handling in `observeAndValidateSender` setKeepGoing(true) } + } override def onUpstreamFinish(): Unit = partnerRef match { case OptionVal.Some(ref) ⇒ ref ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed) - self.unwatch(getPartnerRef) - super.onUpstreamFinish() + finishedWithAwaitingPartnerTermination = OptionVal(Success(Done)) + setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message) case _ ⇒ completedBeforeRemoteConnected = OptionVal(scala.util.Success(Done)) // not terminating on purpose, since other side may subscribe still and then we want to complete it @@ -181,14 +194,16 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( completedBeforeRemoteConnected match { case OptionVal.Some(scala.util.Failure(ex)) ⇒ - log.warning("Stream already terminated with exception before remote side materialized, failing now.") + log.warning("Stream already terminated with exception before remote side materialized, sending failure: {}", ex) partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) - failStage(ex) + finishedWithAwaitingPartnerTermination = OptionVal(Failure(ex)) + setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message) case OptionVal.Some(scala.util.Success(Done)) ⇒ log.warning("Stream already completed before remote side materialized, failing now.") partner ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed) - completeStage() + finishedWithAwaitingPartnerTermination = OptionVal(Success(Done)) + setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message) case OptionVal.None ⇒ if (partner != getPartnerRef) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 6f1085b453..4e4eccd9e5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -64,6 +64,7 @@ private[stream] final class SourceRefStageImpl[Out]( val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey" val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey" + val TerminationDeadlineTimerKey = "TerminationDeadlineTimerKey" // demand management --- private var completed = false @@ -138,6 +139,10 @@ private[stream] final class SourceRefStageImpl[Out]( log.debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName, localCumulativeDemand) getPartnerRef ! StreamRefsProtocol.CumulativeDemand(localCumulativeDemand) scheduleDemandRedelivery() + + case TerminationDeadlineTimerKey ⇒ + failStage(RemoteStreamRefActorTerminatedException(s"Remote partner [$partnerRef] has terminated unexpectedly and no clean completion/failure message was received " + + "(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down.")) } lazy val initialReceive: ((ActorRef, Any)) ⇒ Unit = { @@ -175,19 +180,23 @@ private[stream] final class SourceRefStageImpl[Out]( case (_, Terminated(ref)) ⇒ partnerRef match { case OptionVal.Some(`ref`) ⇒ - failStage(RemoteStreamRefActorTerminatedException(s"The remote partner $ref has terminated! " + - s"Tearing down this side of the stream as well.")) + // we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail + // will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref + // the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there. + scheduleOnce(TerminationDeadlineTimerKey, settings.finalTerminationSignalDeadline) + case _ ⇒ // this should not have happened! It should be impossible that we watched some other actor failStage(RemoteStreamRefActorTerminatedException(s"Received UNEXPECTED Terminated($ref) message! " + s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down.")) - } } def tryPush(): Unit = - if (receiveBuffer.nonEmpty && isAvailable(out)) push(out, receiveBuffer.dequeue()) - else if (receiveBuffer.isEmpty && completed) completeStage() + if (receiveBuffer.nonEmpty && isAvailable(out)) { + val element = receiveBuffer.dequeue() + push(out, element) + } else if (receiveBuffer.isEmpty && completed) completeStage() private def onReceiveElement(payload: Out): Unit = { localRemainingRequested -= 1 @@ -230,7 +239,6 @@ private[stream] final class SourceRefStageImpl[Out]( setHandler(out, this) } - // (logic, MaterializedSinkRef[Out](promise.future)) (logic, promise.future) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala index 7d7c5922c3..8ee4c71bcd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala @@ -13,14 +13,17 @@ import scala.concurrent.duration.FiniteDuration /** INTERNAL API */ @InternalApi private[akka] final case class StreamRefSettingsImpl private ( - override val bufferCapacity: Int, - override val demandRedeliveryInterval: FiniteDuration, - override val subscriptionTimeout: FiniteDuration + override val bufferCapacity: Int, + override val demandRedeliveryInterval: FiniteDuration, + override val subscriptionTimeout: FiniteDuration, + override val finalTerminationSignalDeadline: FiniteDuration ) extends StreamRefSettings { override def withBufferCapacity(value: Int): StreamRefSettings = copy(bufferCapacity = value) override def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings = copy(demandRedeliveryInterval = value) override def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings = copy(subscriptionTimeout = value) + override def withTerminationReceivedBeforeCompletionLeeway(value: FiniteDuration): StreamRefSettings = copy(finalTerminationSignalDeadline = value) override def productPrefix: String = Logging.simpleName(classOf[StreamRefSettings]) + }