=str #25469 stream refs hardened termination via awaiting completion signal (#25561)

* =str #25469 stream refs hardened termination via awaiting completion

* review comments
This commit is contained in:
Konrad `ktoso` Malawski 2018-09-07 16:19:56 +09:00 committed by GitHub
parent 135bd5b9ed
commit 38c6276580
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 31 deletions

View file

@ -164,15 +164,19 @@ object StreamRefsSpec {
actor { actor {
provider = remote provider = remote
serialize-messages = off serialize-messages = off
default-mailbox.mailbox-type = "akka.dispatch.UnboundedMailbox"
} }
remote.netty.tcp { remote {
port = ${address.getPort} artery {
hostname = "${address.getHostName}" enabled = on
} transport = aeron-udp
canonical.hostname = "${address.getHostName}"
stream.materializer.stream-ref { canonical.port = ${address.getPort}
subscription-timeout = 3 seconds log-received-messages = on
log-sent-messages = on
}
} }
} }
""").withFallback(ConfigFactory.load()) """).withFallback(ConfigFactory.load())
@ -283,7 +287,7 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
// the local "remote sink" should cancel, since it should notice the origin target actor is dead // the local "remote sink" should cancel, since it should notice the origin target actor is dead
probe.ensureSubscription() probe.ensureSubscription()
val ex = probe.expectError() val ex = probe.expectError()
ex.getMessage should include("has terminated! Tearing down this side of the stream as well.") ex.getMessage should include("has terminated unexpectedly ")
} }
// bug #24626 // bug #24626

View file

@ -0,0 +1,4 @@
# #25469 stream refs hardened termination via awaiting completion signal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl.*")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl$")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.StreamRefSettings.*")

View file

@ -109,6 +109,15 @@ akka {
# prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking # prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking
# in-active streams which are never subscribed to. # in-active streams which are never subscribed to.
subscription-timeout = 30 seconds subscription-timeout = 30 seconds
# In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed
# message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it.
# This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the
# other side of the stream ref would never send the "final" terminal message.
#
# The timeout specifically means the time between the Terminated signal being received and when the local SourceRef
# determines to fail itself, assuming there was message loss or a complete partition of the completion signal.
final-termination-signal-deadline = 2 seconds
} }
//#stream-ref //#stream-ref
} }

View file

@ -29,7 +29,8 @@ object StreamRefSettings {
StreamRefSettingsImpl( StreamRefSettingsImpl(
bufferCapacity = c.getInt("buffer-capacity"), bufferCapacity = c.getInt("buffer-capacity"),
demandRedeliveryInterval = c.getDuration("demand-redelivery-interval", TimeUnit.MILLISECONDS).millis, demandRedeliveryInterval = c.getDuration("demand-redelivery-interval", TimeUnit.MILLISECONDS).millis,
subscriptionTimeout = c.getDuration("subscription-timeout", TimeUnit.MILLISECONDS).millis subscriptionTimeout = c.getDuration("subscription-timeout", TimeUnit.MILLISECONDS).millis,
finalTerminationSignalDeadline = c.getDuration("final-termination-signal-deadline", TimeUnit.MILLISECONDS).millis
) )
} }
} }
@ -43,11 +44,13 @@ trait StreamRefSettings {
def bufferCapacity: Int def bufferCapacity: Int
def demandRedeliveryInterval: FiniteDuration def demandRedeliveryInterval: FiniteDuration
def subscriptionTimeout: FiniteDuration def subscriptionTimeout: FiniteDuration
def finalTerminationSignalDeadline: FiniteDuration
// --- with... methods --- // --- with... methods ---
def withBufferCapacity(value: Int): StreamRefSettings def withBufferCapacity(value: Int): StreamRefSettings
def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings
def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings
def withTerminationReceivedBeforeCompletionLeeway(value: FiniteDuration): StreamRefSettings
} }

View file

@ -15,7 +15,7 @@ import akka.stream.stage._
import akka.util.{ OptionVal, PrettyDuration } import akka.util.{ OptionVal, PrettyDuration }
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.util.Try import scala.util.{ Failure, Success, Try }
/** INTERNAL API: Implementation class, not intended to be touched directly by end-users */ /** INTERNAL API: Implementation class, not intended to be touched directly by end-users */
@InternalApi @InternalApi
@ -79,6 +79,11 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
private var completedBeforeRemoteConnected: OptionVal[Try[Done]] = OptionVal.None private var completedBeforeRemoteConnected: OptionVal[Try[Done]] = OptionVal.None
// Some when this side of the stream has completed/failed, and we await the Terminated() signal back from the partner
// so we can safely shut down completely; This is to avoid *our* Terminated() signal to reach the partner before the
// Complete/Fail message does, which can happen on transports such as Artery which use a dedicated lane for system messages (Terminated)
private[this] var finishedWithAwaitingPartnerTermination: OptionVal[Try[Done]] = OptionVal.None
override def preStart(): Unit = { override def preStart(): Unit = {
self = getStageActor(initialReceive) self = getStageActor(initialReceive)
@ -102,8 +107,15 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
lazy val initialReceive: ((ActorRef, Any)) Unit = { lazy val initialReceive: ((ActorRef, Any)) Unit = {
case (_, Terminated(ref)) case (_, Terminated(ref))
if (ref == getPartnerRef) if (ref == getPartnerRef)
failStage(RemoteStreamRefActorTerminatedException(s"Remote target receiver of data $partnerRef terminated. " + finishedWithAwaitingPartnerTermination match {
s"Local stream terminating, message loss (on remote side) may have happened.")) case OptionVal.Some(Failure(ex))
failStage(ex)
case OptionVal.Some(_ /* known to be Success*/ )
completeStage() // other side has terminated (in response to a completion message) so we can safely terminate
case OptionVal.None
failStage(RemoteStreamRefActorTerminatedException(s"Remote target receiver of data $partnerRef terminated. " +
s"Local stream terminating, message loss (on remote side) may have happened."))
}
case (sender, StreamRefsProtocol.CumulativeDemand(d)) case (sender, StreamRefsProtocol.CumulativeDemand(d))
// the other side may attempt to "double subscribe", which we want to fail eagerly since we're 1:1 pairings // the other side may attempt to "double subscribe", which we want to fail eagerly since we're 1:1 pairings
@ -124,7 +136,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
tryPull() tryPull()
} }
private def tryPull() = private def tryPull(): Unit =
if (remoteCumulativeDemandConsumed < remoteCumulativeDemandReceived && !hasBeenPulled(in)) { if (remoteCumulativeDemandConsumed < remoteCumulativeDemandReceived && !hasBeenPulled(in)) {
pull(in) pull(in)
} }
@ -136,7 +148,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}], " + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}], " +
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud" throw ex
} }
private def grabSequenced[T](in: Inlet[T]): StreamRefsProtocol.SequencedOnNext[T] = { private def grabSequenced[T](in: Inlet[T]): StreamRefsProtocol.SequencedOnNext[T] = {
@ -145,12 +157,12 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
onNext onNext
} }
override def onUpstreamFailure(ex: Throwable): Unit = override def onUpstreamFailure(ex: Throwable): Unit = {
partnerRef match { partnerRef match {
case OptionVal.Some(ref) case OptionVal.Some(ref)
ref ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) ref ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage)
self.unwatch(getPartnerRef) finishedWithAwaitingPartnerTermination = OptionVal(Failure(ex))
super.onUpstreamFailure(ex) setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message)
case _ case _
completedBeforeRemoteConnected = OptionVal(scala.util.Failure(ex)) completedBeforeRemoteConnected = OptionVal(scala.util.Failure(ex))
@ -158,13 +170,14 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
// the stage will be terminated either by timeout, or by the handling in `observeAndValidateSender` // the stage will be terminated either by timeout, or by the handling in `observeAndValidateSender`
setKeepGoing(true) setKeepGoing(true)
} }
}
override def onUpstreamFinish(): Unit = override def onUpstreamFinish(): Unit =
partnerRef match { partnerRef match {
case OptionVal.Some(ref) case OptionVal.Some(ref)
ref ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed) ref ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed)
self.unwatch(getPartnerRef) finishedWithAwaitingPartnerTermination = OptionVal(Success(Done))
super.onUpstreamFinish() setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message)
case _ case _
completedBeforeRemoteConnected = OptionVal(scala.util.Success(Done)) completedBeforeRemoteConnected = OptionVal(scala.util.Success(Done))
// not terminating on purpose, since other side may subscribe still and then we want to complete it // not terminating on purpose, since other side may subscribe still and then we want to complete it
@ -181,14 +194,16 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
completedBeforeRemoteConnected match { completedBeforeRemoteConnected match {
case OptionVal.Some(scala.util.Failure(ex)) case OptionVal.Some(scala.util.Failure(ex))
log.warning("Stream already terminated with exception before remote side materialized, failing now.") log.warning("Stream already terminated with exception before remote side materialized, sending failure: {}", ex)
partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage) partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage)
failStage(ex) finishedWithAwaitingPartnerTermination = OptionVal(Failure(ex))
setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message)
case OptionVal.Some(scala.util.Success(Done)) case OptionVal.Some(scala.util.Success(Done))
log.warning("Stream already completed before remote side materialized, failing now.") log.warning("Stream already completed before remote side materialized, failing now.")
partner ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed) partner ! StreamRefsProtocol.RemoteStreamCompleted(remoteCumulativeDemandConsumed)
completeStage() finishedWithAwaitingPartnerTermination = OptionVal(Success(Done))
setKeepGoing(true) // we will terminate once partner ref has Terminated (to avoid racing Terminated with completion message)
case OptionVal.None case OptionVal.None
if (partner != getPartnerRef) { if (partner != getPartnerRef) {

View file

@ -64,6 +64,7 @@ private[stream] final class SourceRefStageImpl[Out](
val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey" val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey"
val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey" val DemandRedeliveryTimerKey = "DemandRedeliveryTimerKey"
val TerminationDeadlineTimerKey = "TerminationDeadlineTimerKey"
// demand management --- // demand management ---
private var completed = false private var completed = false
@ -138,6 +139,10 @@ private[stream] final class SourceRefStageImpl[Out](
log.debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName, localCumulativeDemand) log.debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName, localCumulativeDemand)
getPartnerRef ! StreamRefsProtocol.CumulativeDemand(localCumulativeDemand) getPartnerRef ! StreamRefsProtocol.CumulativeDemand(localCumulativeDemand)
scheduleDemandRedelivery() scheduleDemandRedelivery()
case TerminationDeadlineTimerKey
failStage(RemoteStreamRefActorTerminatedException(s"Remote partner [$partnerRef] has terminated unexpectedly and no clean completion/failure message was received " +
"(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down."))
} }
lazy val initialReceive: ((ActorRef, Any)) Unit = { lazy val initialReceive: ((ActorRef, Any)) Unit = {
@ -175,19 +180,23 @@ private[stream] final class SourceRefStageImpl[Out](
case (_, Terminated(ref)) case (_, Terminated(ref))
partnerRef match { partnerRef match {
case OptionVal.Some(`ref`) case OptionVal.Some(`ref`)
failStage(RemoteStreamRefActorTerminatedException(s"The remote partner $ref has terminated! " + // we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail
s"Tearing down this side of the stream as well.")) // will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref
// the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there.
scheduleOnce(TerminationDeadlineTimerKey, settings.finalTerminationSignalDeadline)
case _ case _
// this should not have happened! It should be impossible that we watched some other actor // this should not have happened! It should be impossible that we watched some other actor
failStage(RemoteStreamRefActorTerminatedException(s"Received UNEXPECTED Terminated($ref) message! " + failStage(RemoteStreamRefActorTerminatedException(s"Received UNEXPECTED Terminated($ref) message! " +
s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down.")) s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down."))
} }
} }
def tryPush(): Unit = def tryPush(): Unit =
if (receiveBuffer.nonEmpty && isAvailable(out)) push(out, receiveBuffer.dequeue()) if (receiveBuffer.nonEmpty && isAvailable(out)) {
else if (receiveBuffer.isEmpty && completed) completeStage() val element = receiveBuffer.dequeue()
push(out, element)
} else if (receiveBuffer.isEmpty && completed) completeStage()
private def onReceiveElement(payload: Out): Unit = { private def onReceiveElement(payload: Out): Unit = {
localRemainingRequested -= 1 localRemainingRequested -= 1
@ -230,7 +239,6 @@ private[stream] final class SourceRefStageImpl[Out](
setHandler(out, this) setHandler(out, this)
} }
// (logic, MaterializedSinkRef[Out](promise.future))
(logic, promise.future) (logic, promise.future)
} }

View file

@ -13,14 +13,17 @@ import scala.concurrent.duration.FiniteDuration
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
private[akka] final case class StreamRefSettingsImpl private ( private[akka] final case class StreamRefSettingsImpl private (
override val bufferCapacity: Int, override val bufferCapacity: Int,
override val demandRedeliveryInterval: FiniteDuration, override val demandRedeliveryInterval: FiniteDuration,
override val subscriptionTimeout: FiniteDuration override val subscriptionTimeout: FiniteDuration,
override val finalTerminationSignalDeadline: FiniteDuration
) extends StreamRefSettings { ) extends StreamRefSettings {
override def withBufferCapacity(value: Int): StreamRefSettings = copy(bufferCapacity = value) override def withBufferCapacity(value: Int): StreamRefSettings = copy(bufferCapacity = value)
override def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings = copy(demandRedeliveryInterval = value) override def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings = copy(demandRedeliveryInterval = value)
override def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings = copy(subscriptionTimeout = value) override def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings = copy(subscriptionTimeout = value)
override def withTerminationReceivedBeforeCompletionLeeway(value: FiniteDuration): StreamRefSettings = copy(finalTerminationSignalDeadline = value)
override def productPrefix: String = Logging.simpleName(classOf[StreamRefSettings]) override def productPrefix: String = Logging.simpleName(classOf[StreamRefSettings])
} }