diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index 085a1584b3..c5ed6469bd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -15,8 +15,9 @@ import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe } import akka.util.ByteString import com.typesafe.config._ +import scala.collection.immutable import scala.concurrent.duration._ -import scala.concurrent.Future +import scala.concurrent.{ Await, Future } import scala.util.control.NoStackTrace object StreamRefsSpec { @@ -161,6 +162,10 @@ object StreamRefsSpec { port = ${address.getPort} hostname = "${address.getHostName}" } + + stream.materializer.stream-ref { + subscription-timeout = 5 seconds + } } """).withFallback(ConfigFactory.load()) } @@ -278,10 +283,11 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende remoteActor ! "give-subscribe-timeout" val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]] // materialize directly and start consuming, timeout is 500ms - remoteSource.throttle(1, 100.millis, 1, ThrottleMode.Shaping) - .take(10) // 10 * 100 millis - way more than timeout for good measure + val eventualStrings: Future[immutable.Seq[String]] = remoteSource.throttle(1, 100.millis, 1, ThrottleMode.Shaping) + .take(60) // 60 * 100 millis - data flowing for 6 seconds - both 500ms and 5s timeouts should have passed .runWith(Sink.seq) - .futureValue // this would fail if it timed out + + Await.result(eventualStrings, 8.seconds) } } @@ -342,7 +348,7 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende .run() val failure = p.expectMsgType[Failure] - failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Sink reference") + failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Source reference") // the local "remote sink" should cancel, since it should notice the origin target actor is dead probe.expectCancellation() diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index d824d720a3..a423636f06 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -93,10 +93,11 @@ private[stream] final class SinkRefStageImpl[In] private[akka] ( case OptionVal.Some(ref) ⇒ ref ! StreamRefsProtocol.OnSubscribeHandshake(self.ref) tryPull() - case _ ⇒ // nothing to do + case _ ⇒ + // only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of the Actor + // has not been provided with a valid initialPartnerRef + scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) // nothing to do } - - scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) } lazy val initialReceive: ((ActorRef, Any)) ⇒ Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index dab422f624..9f561bafdd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -89,11 +89,17 @@ private[stream] final class SourceRefStageImpl[Out]( self = getStageActor(initialReceive) log.debug("[{}] Allocated receiver: {}", stageActorName, self.ref) if (initialPartnerRef.isDefined) // this will set the partnerRef - observeAndValidateSender(initialPartnerRef.get, "") + observeAndValidateSender(initialPartnerRef.get, "Illegal initialPartnerRef! This would be a bug in the SourceRef usage or impl.") promise.success(SinkRefImpl(self.ref)) - scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) + partnerRef match { + case OptionVal.None ⇒ + // only schedule timeout timer if partnerRef has not been resolved yet (i.e. if this instance of an Actor + // has not been provided with a valid initial partnerRef + scheduleOnce(SubscriptionTimeoutTimerKey, subscriptionTimeout.timeout) // nothing to do + case _ ⇒ + } } override def onPull(): Unit = { @@ -126,7 +132,7 @@ private[stream] final class SourceRefStageImpl[Out]( case SubscriptionTimeoutTimerKey ⇒ val ex = StreamRefSubscriptionTimeoutException( // we know the future has been competed by now, since it is in preStart - s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}]," + + s"[$stageActorName] Remote side did not subscribe (materialize) handed out Source reference [${promise.future.value}]," + s"within subscription timeout: ${PrettyDuration.format(subscriptionTimeout.timeout)}!") throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud" diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala index d04987c28d..13a16a3a21 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamRefs.scala @@ -5,7 +5,7 @@ package akka.stream.scaladsl import akka.annotation.ApiMayChange -import akka.stream.{ SinkRef, SourceRef } +import akka.stream.{ SinkRef, SourceRef, StreamRefAttributes } import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl } import akka.util.OptionVal