From d03d8fdd7e5b1309a8aa5aa0c72e023cefa44ec1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 12 Mar 2018 14:19:55 +0100 Subject: [PATCH] Subscription timeouts should not hit after stream ref subscribe #24626 --- .../akka/stream/scaladsl/StreamRefsSpec.scala | 32 +++++++++++++++++-- .../stream/impl/streamref/SinkRefImpl.scala | 1 + .../stream/impl/streamref/SourceRefImpl.scala | 2 +- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index 06b9333c1d..c1a34133e8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -3,13 +3,13 @@ */ package akka.stream.scaladsl -import akka.NotUsed +import akka.{ Done, NotUsed } import akka.actor.Status.Failure import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props } import akka.pattern._ import akka.stream.testkit.TestPublisher import akka.stream.testkit.scaladsl._ -import akka.stream.{ ActorMaterializer, SinkRef, SourceRef, StreamRefAttributes } +import akka.stream._ import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe } import akka.util.{ ByteString, PrettyDuration } import com.typesafe.config._ @@ -17,6 +17,7 @@ import com.typesafe.config._ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.Future +import scala.util.Success import scala.util.control.NoStackTrace object StreamRefsSpec { @@ -68,7 +69,6 @@ object StreamRefsSpec { .run() ref pipeTo sender() - // case "send-bulk" ⇒ // /* // * Here we're able to send a source to a remote recipient @@ -273,6 +273,17 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende val ex = probe.expectError() ex.getMessage should include("has terminated! Tearing down this side of the stream as well.") } + + // bug #24626 + "not receive subscription timeout when got subscribed" in { + remoteActor ! "give-subscribe-timeout" + val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] + // materialize directly and start consuming, timeout is 500ms + remoteSource.throttle(1, 100.millis, 1, ThrottleMode.Shaping) + .take(10) // 10 * 100 millis - way more than timeout for good measure + .runWith(Sink.seq) + .futureValue // this would fail if it timed out + } } "A SinkRef" must { @@ -338,6 +349,21 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende probe.expectCancellation() } + // bug #24626 + "now receive timeout if subscribing is already done to the sink ref" in { + remoteActor ! "receive-subscribe-timeout" + val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + Source.repeat("whatever") + .throttle(1, 100.millis, 1, ThrottleMode.Shaping) + .take(10) // the timeout is 500ms, so this makes sure we run more time than that + .runWith(remoteSink) + + (0 to 9).foreach { _ ⇒ + p.expectMsg("whatever") + } + p.expectMsg("") + } + "respect back -pressure from (implied by origin Sink)" in { remoteActor ! "receive-32" val sinkRef = expectMsgType[SinkRef[String]] diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index 409811d898..7a9fbad7a9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -175,6 +175,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( def observeAndValidateSender(partner: ActorRef, failureMsg: String): Unit = { if (partnerRef.isEmpty) { partnerRef = OptionVal(partner) + cancelTimer(SubscriptionTimeoutTimerKey) self.watch(partner) completedBeforeRemoteConnected match { diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index b344e6dee7..6dcb14afa9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -139,7 +139,7 @@ private[stream] final class SourceRefStageImpl[Out]( lazy val initialReceive: ((ActorRef, Any)) ⇒ Unit = { case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) ⇒ - cancelTimer("SubscriptionTimeoutTimerKey") + cancelTimer(SubscriptionTimeoutTimerKey) observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext") log.debug("[{}] Received handshake {} from {}", stageActorName, msg, sender)