diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala index 1394ca1b97..97389bd079 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala @@ -27,6 +27,7 @@ import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success +import akka.util.JavaDurationConverters._ object StreamRefSpec extends MultiNodeConfig { val first = role("first") @@ -244,7 +245,14 @@ abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec) with MultiNode streamLifecycle1.expectMsg("failed-system-42-tmp") } runOn(third) { - streamLifecycle3.expectMsg("failed-system-42-tmp") + // there's a race here, we know the SourceRef actor was started but we don't know if it + // got the remote actor ref and watched it terminate or if we cut connection before that + // and it triggered the subscription timeout. Therefore we must wait more than the + // the subscription timeout for a failure + val timeout = system.settings.config + .getDuration("akka.stream.materializer.subscription-timeout.timeout") + .asScala + 2.seconds + streamLifecycle3.expectMsg(timeout, "failed-system-42-tmp") } enterBarrier("after-3") diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index b5f22e95f9..521d4c4c9f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -106,7 +106,7 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity) - private var requestStrategy: RequestStrategy = _ // initialized in preStart since depends on receiveBuffer's size + private val requestStrategy: RequestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity) // end of demand management --- // initialized with the originRef if present, that means we're the "remote" for an already active Source on the other side (the "origin") @@ -115,15 +115,13 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio private def getPartnerRef = partnerRef.get override def preStart(): Unit = { - requestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity) - log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref) if (initialPartnerRef.isDefined) // this will set the partnerRef observeAndValidateSender( initialPartnerRef.get, "Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.") - //this timer will be cancelled if we receive the handshake from the remote SinkRef + // This timer will be cancelled if we receive the handshake from the remote SinkRef // either created in this method and provided as self.ref as initialPartnerRef // or as the response to first CumulativeDemand request sent to remote SinkRef scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout)