str #24934 - fix stream reference timeout
str #24934 - Change in error message -> change the value expected in test
This commit is contained in:
parent
9f7962a8c2
commit
0dd63837b6
4 changed files with 25 additions and 12 deletions
|
|
@ -15,8 +15,9 @@ import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import com.typesafe.config._
|
import com.typesafe.config._
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.{ Await, Future }
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
|
|
||||||
object StreamRefsSpec {
|
object StreamRefsSpec {
|
||||||
|
|
@ -161,6 +162,10 @@ object StreamRefsSpec {
|
||||||
port = ${address.getPort}
|
port = ${address.getPort}
|
||||||
hostname = "${address.getHostName}"
|
hostname = "${address.getHostName}"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stream.materializer.stream-ref {
|
||||||
|
subscription-timeout = 5 seconds
|
||||||
|
}
|
||||||
}
|
}
|
||||||
""").withFallback(ConfigFactory.load())
|
""").withFallback(ConfigFactory.load())
|
||||||
}
|
}
|
||||||
|
|
@ -278,10 +283,11 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
|
||||||
remoteActor ! "give-subscribe-timeout"
|
remoteActor ! "give-subscribe-timeout"
|
||||||
val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]]
|
val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]]
|
||||||
// materialize directly and start consuming, timeout is 500ms
|
// materialize directly and start consuming, timeout is 500ms
|
||||||
remoteSource.throttle(1, 100.millis, 1, ThrottleMode.Shaping)
|
val eventualStrings: Future[immutable.Seq[String]] = remoteSource.throttle(1, 100.millis, 1, ThrottleMode.Shaping)
|
||||||
.take(10) // 10 * 100 millis - way more than timeout for good measure
|
.take(60) // 60 * 100 millis - data flowing for 6 seconds - both 500ms and 5s timeouts should have passed
|
||||||
.runWith(Sink.seq)
|
.runWith(Sink.seq)
|
||||||
.futureValue // this would fail if it timed out
|
|
||||||
|
Await.result(eventualStrings, 8.seconds)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -342,7 +348,7 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
|
||||||
.run()
|
.run()
|
||||||
|
|
||||||
val failure = p.expectMsgType[Failure]
|
val failure = p.expectMsgType[Failure]
|
||||||
failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Sink reference")
|
failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Source reference")
|
||||||
|
|
||||||
// the local "remote sink" should cancel, since it should notice the origin target actor is dead
|
// the local "remote sink" should cancel, since it should notice the origin target actor is dead
|
||||||
probe.expectCancellation()
|
probe.expectCancellation()
|
||||||
|
|
|
||||||
|
|
@ -93,10 +93,11 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
|
||||||
case OptionVal.Some(ref) ⇒
|
case OptionVal.Some(ref) ⇒
|
||||||
ref ! StreamRefsProtocol.OnSubscribeHandshake(self.ref)
|
ref ! StreamRefsProtocol.OnSubscribeHandshake(self.ref)
|
||||||
tryPull()
|
tryPull()
|
||||||
case _ ⇒ // nothing to do
|
case _ ⇒
|
||||||
|
// only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor
|
||||||
|
// has not been provided with a valid initialPartnerRef
|
||||||
|
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) // nothing to do
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy val initialReceive: ((ActorRef, Any)) ⇒ Unit = {
|
lazy val initialReceive: ((ActorRef, Any)) ⇒ Unit = {
|
||||||
|
|
|
||||||
|
|
@ -89,11 +89,17 @@ private[stream] final class SourceRefStageImpl[Out](
|
||||||
self = getStageActor(initialReceive)
|
self = getStageActor(initialReceive)
|
||||||
log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref)
|
log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref)
|
||||||
if (initialPartnerRef.isDefined) // this will set the partnerRef
|
if (initialPartnerRef.isDefined) // this will set the partnerRef
|
||||||
observeAndValidateSender(initialPartnerRef.get, "<no error case here, definitely valid>")
|
observeAndValidateSender(initialPartnerRef.get, "Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.")
|
||||||
|
|
||||||
promise.success(SinkRefImpl(self.ref))
|
promise.success(SinkRefImpl(self.ref))
|
||||||
|
|
||||||
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout)
|
partnerRef match {
|
||||||
|
case OptionVal.None ⇒
|
||||||
|
// only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of an Actor
|
||||||
|
// has not been provided with a valid initial partnerRef
|
||||||
|
scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) // nothing to do
|
||||||
|
case _ ⇒
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onPull(): Unit = {
|
override def onPull(): Unit = {
|
||||||
|
|
@ -126,7 +132,7 @@ private[stream] final class SourceRefStageImpl[Out](
|
||||||
case SubscriptionTimeoutTimerKey ⇒
|
case SubscriptionTimeoutTimerKey ⇒
|
||||||
val ex = StreamRefSubscriptionTimeoutException(
|
val ex = StreamRefSubscriptionTimeoutException(
|
||||||
// we know the future has been competed by now, since it is in preStart
|
// we know the future has been competed by now, since it is in preStart
|
||||||
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}]," +
|
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}]," +
|
||||||
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
|
s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!")
|
||||||
|
|
||||||
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"
|
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
package akka.stream.scaladsl
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
import akka.annotation.ApiMayChange
|
import akka.annotation.ApiMayChange
|
||||||
import akka.stream.{ SinkRef, SourceRef }
|
import akka.stream.{ SinkRef, SourceRef, StreamRefAttributes }
|
||||||
import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl }
|
import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl }
|
||||||
import akka.util.OptionVal
|
import akka.util.OptionVal
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue