diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index c5ed6469bd..e19cd939ed 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -93,6 +93,14 @@ object StreamRefsSpec { sink pipeTo sender() + case "receive-ignore" ⇒ + val sink = + StreamRefs.sinkRef[String]() + .to(Sink.ignore) + .run() + + sink pipeTo sender() + case "receive-subscribe-timeout" ⇒ val sink = StreamRefs.sinkRef[String]() .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis)) @@ -164,7 +172,7 @@ object StreamRefsSpec { } stream.materializer.stream-ref { - subscription-timeout = 5 seconds + subscription-timeout = 3 seconds } } """).withFallback(ConfigFactory.load()) @@ -289,6 +297,19 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende Await.result(eventualStrings, 8.seconds) } + + // bug #24934 + "not receive timeout while data is being sent" in { + remoteActor ! "give-infinite" + val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] + + val done = + remoteSource.throttle(1, 200.millis) + .takeWithin(5.seconds) // which is > than the subscription timeout (so we make sure the timeout was cancelled) + .runWith(Sink.ignore) + + Await.result(done, 8.seconds) + } } "A SinkRef" must { @@ -348,18 +369,18 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende .run() val failure = p.expectMsgType[Failure] - failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Source reference") + failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Sink reference") // the local "remote sink" should cancel, since it should notice the origin target actor is dead probe.expectCancellation() } // bug #24626 - "now receive timeout if subscribing is already done to the sink ref" in { + "not receive timeout if subscribing is already done to the sink ref" in { remoteActor ! "receive-subscribe-timeout" val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] Source.repeat("whatever") - .throttle(1, 100.millis, 1, ThrottleMode.Shaping) + .throttle(1, 100.millis) .take(10) // the timeout is 500ms, so this makes sure we run more time than that .runWith(remoteSink) @@ -369,6 +390,22 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende p.expectMsg("") } + // bug #24934 + "not receive timeout while data is being sent" in { + remoteActor ! "receive-ignore" + val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + + val done = + Source.repeat("hello-24934") + .throttle(1, 300.millis) + .takeWithin(5.seconds) // which is > than the subscription timeout (so we make sure the timeout was cancelled) + .alsoToMat(Sink.last)(Keep.right) + .to(remoteSink) + .run() + + Await.result(done, 7.seconds) + } + "respect back -pressure from (implied by origin Sink)" in { remoteActor ! "receive-32" val sinkRef = expectMsgType[SinkRef[String]] diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index a423636f06..509a33ed74 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -82,22 +82,21 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( override def preStart(): Unit = { self = getStageActor(initialReceive) - if (initialPartnerRef.isDefined) // this will set the `partnerRef` - observeAndValidateSender(initialPartnerRef.get, "Illegal initialPartnerRef! This would be a bug in the SinkRef usage or impl.") + initialPartnerRef match { + case OptionVal.Some(ref) ⇒ + // this will set the `partnerRef` + observeAndValidateSender(ref, "Illegal initialPartnerRef! This may be a bug, please report your " + + "usage and complete stack trace on the issue tracker: https://github.com/akka/akka") + tryPull() + case OptionVal.None ⇒ + // only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor + // has not been provided with a valid initialPartnerRef) + scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) + } log.debug("Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}", initialPartnerRef, self.ref) promise.success(SourceRefImpl(self.ref)) - - partnerRef match { - case OptionVal.Some(ref) ⇒ - ref ! StreamRefsProtocol.OnSubscribeHandshake(self.ref) - tryPull() - case _ ⇒ - // only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor - // has not been provided with a valid initialPartnerRef - scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) // nothing to do - } } lazy val initialReceive: ((ActorRef, Any)) ⇒ Unit = { @@ -134,7 +133,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( case SubscriptionTimeoutTimerKey ⇒ val ex = StreamRefSubscriptionTimeoutException( // we know the future has been competed by now, since it is in preStart - s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}], " + + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}], " + s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud" @@ -176,6 +175,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( def observeAndValidateSender(partner: ActorRef, failureMsg: String): Unit = { if (partnerRef.isEmpty) { partnerRef = OptionVal(partner) + partner ! StreamRefsProtocol.OnSubscribeHandshake(self.ref) cancelTimer(SubscriptionTimeoutTimerKey) self.watch(partner) diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 9f561bafdd..10ff7d24a6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -93,13 +93,10 @@ private[stream] final class SourceRefStageImpl[Out]( promise.success(SinkRefImpl(self.ref)) - partnerRef match { - case OptionVal.None ⇒ - // only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of an Actor - // has not been provided with a valid initial partnerRef - scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) // nothing to do - case _ ⇒ - } + //this timer will be cancelled if we receive the handshake from the remote SinkRef + // either created in this method and provided as self.ref as initialPartnerRef + // or as the response to first CumulativeDemand request sent to remote SinkRef + scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) } override def onPull(): Unit = { @@ -132,7 +129,7 @@ private[stream] final class SourceRefStageImpl[Out]( case SubscriptionTimeoutTimerKey ⇒ val ex = StreamRefSubscriptionTimeoutException( // we know the future has been competed by now, since it is in preStart - s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}]," + + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}]," + s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"