Subscription timeouts should not hit after stream ref subscribe #24626

This commit is contained in:
Johan Andrén 2018-03-12 14:19:55 +01:00 committed by Konrad `ktoso` Malawski
parent d3055a7f7f
commit d03d8fdd7e
3 changed files with 31 additions and 4 deletions

View file

@ -3,13 +3,13 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.NotUsed import akka.{ Done, NotUsed }
import akka.actor.Status.Failure import akka.actor.Status.Failure
import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props } import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props }
import akka.pattern._ import akka.pattern._
import akka.stream.testkit.TestPublisher import akka.stream.testkit.TestPublisher
import akka.stream.testkit.scaladsl._ import akka.stream.testkit.scaladsl._
import akka.stream.{ ActorMaterializer, SinkRef, SourceRef, StreamRefAttributes } import akka.stream._
import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe } import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe }
import akka.util.{ ByteString, PrettyDuration } import akka.util.{ ByteString, PrettyDuration }
import com.typesafe.config._ import com.typesafe.config._
@ -17,6 +17,7 @@ import com.typesafe.config._
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.Success
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
object StreamRefsSpec { object StreamRefsSpec {
@ -68,7 +69,6 @@ object StreamRefsSpec {
.run() .run()
ref pipeTo sender() ref pipeTo sender()
// case "send-bulk" // case "send-bulk"
// /* // /*
// * Here we're able to send a source to a remote recipient // * Here we're able to send a source to a remote recipient
@ -273,6 +273,17 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
val ex = probe.expectError() val ex = probe.expectError()
ex.getMessage should include("has terminated! Tearing down this side of the stream as well.") ex.getMessage should include("has terminated! Tearing down this side of the stream as well.")
} }
// bug #24626
"not receive subscription timeout when got subscribed" in {
remoteActor ! "give-subscribe-timeout"
val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]]
// materialize directly and start consuming, timeout is 500ms
remoteSource.throttle(1, 100.millis, 1, ThrottleMode.Shaping)
.take(10) // 10 * 100 millis - way more than timeout for good measure
.runWith(Sink.seq)
.futureValue // this would fail if it timed out
}
} }
"A SinkRef" must { "A SinkRef" must {
@ -338,6 +349,21 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
probe.expectCancellation() probe.expectCancellation()
} }
// bug #24626
"now receive timeout if subscribing is already done to the sink ref" in {
remoteActor ! "receive-subscribe-timeout"
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
Source.repeat("whatever")
.throttle(1, 100.millis, 1, ThrottleMode.Shaping)
.take(10) // the timeout is 500ms, so this makes sure we run more time than that
.runWith(remoteSink)
(0 to 9).foreach { _
p.expectMsg("whatever")
}
p.expectMsg("<COMPLETE>")
}
"respect back -pressure from (implied by origin Sink)" in { "respect back -pressure from (implied by origin Sink)" in {
remoteActor ! "receive-32" remoteActor ! "receive-32"
val sinkRef = expectMsgType[SinkRef[String]] val sinkRef = expectMsgType[SinkRef[String]]

View file

@ -175,6 +175,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
def observeAndValidateSender(partner: ActorRef, failureMsg: String): Unit = { def observeAndValidateSender(partner: ActorRef, failureMsg: String): Unit = {
if (partnerRef.isEmpty) { if (partnerRef.isEmpty) {
partnerRef = OptionVal(partner) partnerRef = OptionVal(partner)
cancelTimer(SubscriptionTimeoutTimerKey)
self.watch(partner) self.watch(partner)
completedBeforeRemoteConnected match { completedBeforeRemoteConnected match {

View file

@ -139,7 +139,7 @@ private[stream] final class SourceRefStageImpl[Out](
lazy val initialReceive: ((ActorRef, Any)) Unit = { lazy val initialReceive: ((ActorRef, Any)) Unit = {
case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef))
cancelTimer("SubscriptionTimeoutTimerKey") cancelTimer(SubscriptionTimeoutTimerKey)
observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext") observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext")
log.debug("[{}] Received handshake {} from {}", stageActorName, msg, sender) log.debug("[{}] Received handshake {} from {}", stageActorName, msg, sender)