=str #24934 sub timeout must be cancelled when streams establish conn

This commit is contained in:
Konrad `ktoso` Malawski 2018-04-23 22:44:24 +02:00
parent 0dd63837b6
commit 5f5b41f3b8
3 changed files with 59 additions and 25 deletions

View file

@ -93,6 +93,14 @@ object StreamRefsSpec {
sink pipeTo sender()
case "receive-ignore"
val sink =
StreamRefs.sinkRef[String]()
.to(Sink.ignore)
.run()
sink pipeTo sender()
case "receive-subscribe-timeout"
val sink = StreamRefs.sinkRef[String]()
.withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
@ -164,7 +172,7 @@ object StreamRefsSpec {
}
stream.materializer.stream-ref {
subscription-timeout = 5 seconds
subscription-timeout = 3 seconds
}
}
""").withFallback(ConfigFactory.load())
@ -289,6 +297,19 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
Await.result(eventualStrings, 8.seconds)
}
// bug #24934
"not receive timeout while data is being sent" in {
remoteActor ! "give-infinite"
val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]]
val done =
remoteSource.throttle(1, 200.millis)
.takeWithin(5.seconds) // which is > than the subscription timeout (so we make sure the timeout was cancelled)
.runWith(Sink.ignore)
Await.result(done, 8.seconds)
}
}
"A SinkRef" must {
@ -348,18 +369,18 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
.run()
val failure = p.expectMsgType[Failure]
failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Source reference")
failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Sink reference")
// the local "remote sink" should cancel, since it should notice the origin target actor is dead
probe.expectCancellation()
}
// bug #24626
"now receive timeout if subscribing is already done to the sink ref" in {
"not receive timeout if subscribing is already done to the sink ref" in {
remoteActor ! "receive-subscribe-timeout"
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
Source.repeat("whatever")
.throttle(1, 100.millis, 1, ThrottleMode.Shaping)
.throttle(1, 100.millis)
.take(10) // the timeout is 500ms, so this makes sure we run more time than that
.runWith(remoteSink)
@ -369,6 +390,22 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
p.expectMsg("<COMPLETE>")
}
// bug #24934
"not receive timeout while data is being sent" in {
remoteActor ! "receive-ignore"
val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
val done =
Source.repeat("hello-24934")
.throttle(1, 300.millis)
.takeWithin(5.seconds) // which is > than the subscription timeout (so we make sure the timeout was cancelled)
.alsoToMat(Sink.last)(Keep.right)
.to(remoteSink)
.run()
Await.result(done, 7.seconds)
}
"respect back -pressure from (implied by origin Sink)" in {
remoteActor ! "receive-32"
val sinkRef = expectMsgType[SinkRef[String]]

View file

@ -82,22 +82,21 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
override def preStart(): Unit = {
self = getStageActor(initialReceive)
if (initialPartnerRef.isDefined) // this will set the `partnerRef`
observeAndValidateSender(initialPartnerRef.get, "Illegal initialPartnerRef! This would be a bug in the SinkRef usage or impl.")
initialPartnerRef match {
case OptionVal.Some(ref)
// this will set the `partnerRef`
observeAndValidateSender(ref, "Illegal initialPartnerRef! This may be a bug, please report your " +
"usage and complete stack trace on the issue tracker: https://github.com/akka/akka")
tryPull()
case OptionVal.None
// only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor
// has not been provided with a valid initialPartnerRef)
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout)
}
log.debug("Created SinkRef, pointing to remote Sink receiver: {}, local worker: {}", initialPartnerRef, self.ref)
promise.success(SourceRefImpl(self.ref))
partnerRef match {
case OptionVal.Some(ref)
ref ! StreamRefsProtocol.OnSubscribeHandshake(self.ref)
tryPull()
case _
// only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor
// has not been provided with a valid initialPartnerRef
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) // nothing to do
}
}
lazy val initialReceive: ((ActorRef, Any)) Unit = {
@ -134,7 +133,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
case SubscriptionTimeoutTimerKey
val ex = StreamRefSubscriptionTimeoutException(
// we know the future has been competed by now, since it is in preStart
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}], " +
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}], " +
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"
@ -176,6 +175,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
def observeAndValidateSender(partner: ActorRef, failureMsg: String): Unit = {
if (partnerRef.isEmpty) {
partnerRef = OptionVal(partner)
partner ! StreamRefsProtocol.OnSubscribeHandshake(self.ref)
cancelTimer(SubscriptionTimeoutTimerKey)
self.watch(partner)

View file

@ -93,13 +93,10 @@ private[stream] final class SourceRefStageImpl[Out](
promise.success(SinkRefImpl(self.ref))
partnerRef match {
case OptionVal.None
// only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of an Actor
// has not been provided with a valid initial partnerRef
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) // nothing to do
case _
}
//this timer will be cancelled if we receive the handshake from the remote SinkRef
// either created in this method and provided as self.ref as initialPartnerRef
// or as the response to first CumulativeDemand request sent to remote SinkRef
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout)
}
override def onPull(): Unit = {
@ -132,7 +129,7 @@ private[stream] final class SourceRefStageImpl[Out](
case SubscriptionTimeoutTimerKey
val ex = StreamRefSubscriptionTimeoutException(
// we know the future has been competed by now, since it is in preStart
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}]," +
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}]," +
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"