Eliminate race in multi node StreamRefSpec #26392
This commit is contained in:
parent
c30aacf991
commit
60a6e9bf5b
2 changed files with 11 additions and 5 deletions
|
|
@ -27,6 +27,7 @@ import scala.concurrent.Future
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.util.Failure
|
import scala.util.Failure
|
||||||
import scala.util.Success
|
import scala.util.Success
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
|
||||||
object StreamRefSpec extends MultiNodeConfig {
|
object StreamRefSpec extends MultiNodeConfig {
|
||||||
val first = role("first")
|
val first = role("first")
|
||||||
|
|
@ -244,7 +245,14 @@ abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec) with MultiNode
|
||||||
streamLifecycle1.expectMsg("failed-system-42-tmp")
|
streamLifecycle1.expectMsg("failed-system-42-tmp")
|
||||||
}
|
}
|
||||||
runOn(third) {
|
runOn(third) {
|
||||||
streamLifecycle3.expectMsg("failed-system-42-tmp")
|
// there's a race here, we know the SourceRef actor was started but we don't know if it
|
||||||
|
// got the remote actor ref and watched it terminate or if we cut connection before that
|
||||||
|
// and it triggered the subscription timeout. Therefore we must wait more than the
|
||||||
|
// the subscription timeout for a failure
|
||||||
|
val timeout = system.settings.config
|
||||||
|
.getDuration("akka.stream.materializer.subscription-timeout.timeout")
|
||||||
|
.asScala + 2.seconds
|
||||||
|
streamLifecycle3.expectMsg(timeout, "failed-system-42-tmp")
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-3")
|
enterBarrier("after-3")
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
|
||||||
|
|
||||||
private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity)
|
private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity)
|
||||||
|
|
||||||
private var requestStrategy: RequestStrategy = _ // initialized in preStart since depends on receiveBuffer's size
|
private val requestStrategy: RequestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity)
|
||||||
// end of demand management ---
|
// end of demand management ---
|
||||||
|
|
||||||
// initialized with the originRef if present, that means we're the "remote" for an already active Source on the other side (the "origin")
|
// initialized with the originRef if present, that means we're the "remote" for an already active Source on the other side (the "origin")
|
||||||
|
|
@ -115,15 +115,13 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
|
||||||
private def getPartnerRef = partnerRef.get
|
private def getPartnerRef = partnerRef.get
|
||||||
|
|
||||||
override def preStart(): Unit = {
|
override def preStart(): Unit = {
|
||||||
requestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity)
|
|
||||||
|
|
||||||
log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref)
|
log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref)
|
||||||
if (initialPartnerRef.isDefined) // this will set the partnerRef
|
if (initialPartnerRef.isDefined) // this will set the partnerRef
|
||||||
observeAndValidateSender(
|
observeAndValidateSender(
|
||||||
initialPartnerRef.get,
|
initialPartnerRef.get,
|
||||||
"Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.")
|
"Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.")
|
||||||
|
|
||||||
//this timer will be cancelled if we receive the handshake from the remote SinkRef
|
// This timer will be cancelled if we receive the handshake from the remote SinkRef
|
||||||
// either created in this method and provided as self.ref as initialPartnerRef
|
// either created in this method and provided as self.ref as initialPartnerRef
|
||||||
// or as the response to first CumulativeDemand request sent to remote SinkRef
|
// or as the response to first CumulativeDemand request sent to remote SinkRef
|
||||||
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout)
|
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue