From 28bb2174eedb99b7fc8888f03c8e905cc72e5715 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 22 Aug 2014 10:04:21 +0200 Subject: [PATCH] !str #15050 Remove unused ReceiveTimeout --- .../src/main/scala/akka/stream/FlowMaterializer.scala | 10 ---------- .../main/scala/akka/stream/impl/ActorPublisher.scala | 3 --- .../main/scala/akka/stream/impl/FuturePublisher.scala | 2 -- .../scala/akka/stream/impl/IterablePublisher.scala | 2 -- .../main/scala/akka/stream/impl/TickPublisher.scala | 2 -- 5 files changed, 19 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index ff57528570..c73b68d4b0 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -109,8 +109,6 @@ case class MaterializerSettings( maxFanOutBufferSize: Int = 16, initialInputBufferSize: Int = 4, maximumInputBufferSize: Int = 16, - upstreamSubscriptionTimeout: FiniteDuration = 3.seconds, - downstreamSubscriptionTimeout: FiniteDuration = 3.seconds, dispatcher: String = Deploy.NoDispatcherGiven) { private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 @@ -132,14 +130,6 @@ case class MaterializerSettings( def withFanOut(initialFanOutBufferSize: Int, maxFanOutBufferSize: Int): MaterializerSettings = copy(initialFanOutBufferSize = initialFanOutBufferSize, maxFanOutBufferSize = maxFanOutBufferSize) - def withSubscriptionTimeout(timeout: FiniteDuration): MaterializerSettings = - copy(upstreamSubscriptionTimeout = timeout, downstreamSubscriptionTimeout = timeout) - - def withSubscriptionTimeout(upstreamSubscriptionTimeout: FiniteDuration, - downstreamSubscriptionTimeout: FiniteDuration): MaterializerSettings = - copy(upstreamSubscriptionTimeout = upstreamSubscriptionTimeout, - downstreamSubscriptionTimeout = downstreamSubscriptionTimeout) - def withDispatcher(dispatcher: String): MaterializerSettings = copy(dispatcher = dispatcher) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 52f9f5a4f1..1fec18ef35 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -158,8 +158,6 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi var pub: ActorPublisher[T] = _ var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason - context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) - final def receive = { case ExposedPublisher(pub) ⇒ this.pub = pub.asInstanceOf[ActorPublisher[T]] @@ -169,7 +167,6 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi final def waitingForSubscribers: Receive = { case SubscribePending ⇒ pub.takePendingSubscribers() foreach registerSubscriber - context.setReceiveTimeout(Duration.Undefined) context.become(active) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index 7a509bb048..d11fc1564e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -53,7 +53,6 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS def receive = { case ExposedPublisher(publisher) ⇒ exposedPublisher = publisher - context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) context.become(waitingForFirstSubscriber) case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") } @@ -61,7 +60,6 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS def waitingForFirstSubscriber: Receive = { case SubscribePending ⇒ exposedPublisher.takePendingSubscribers() foreach registerSubscriber - context.setReceiveTimeout(Duration.Undefined) import context.dispatcher future.pipeTo(self) context.become(active) diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala index 664334738d..b5587217a2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala @@ -57,7 +57,6 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting def receive = { case ExposedPublisher(publisher) ⇒ exposedPublisher = publisher - context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) context.become(waitingForFirstSubscriber) case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") } @@ -65,7 +64,6 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting def waitingForFirstSubscriber: Receive = { case SubscribePending ⇒ exposedPublisher.takePendingSubscribers() foreach registerSubscriber - context.setReceiveTimeout(Duration.Undefined) context.become(active) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 5ca66bd893..1ef7bc033b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -56,7 +56,6 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite def receive = { case ExposedPublisher(publisher) ⇒ exposedPublisher = publisher - context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) context.become(waitingForFirstSubscriber) case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") } @@ -64,7 +63,6 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite def waitingForFirstSubscriber: Receive = { case SubscribePending ⇒ exposedPublisher.takePendingSubscribers() foreach registerSubscriber - context.setReceiveTimeout(Duration.Undefined) import context.dispatcher tickTask = Some(context.system.scheduler.schedule(initialDelay, interval, self, Tick)) context.become(active)