!str #15050 Remove unused ReceiveTimeout
This commit is contained in:
parent
8b359b90d8
commit
28bb2174ee
5 changed files with 0 additions and 19 deletions
|
|
@ -109,8 +109,6 @@ case class MaterializerSettings(
|
|||
maxFanOutBufferSize: Int = 16,
|
||||
initialInputBufferSize: Int = 4,
|
||||
maximumInputBufferSize: Int = 16,
|
||||
upstreamSubscriptionTimeout: FiniteDuration = 3.seconds,
|
||||
downstreamSubscriptionTimeout: FiniteDuration = 3.seconds,
|
||||
dispatcher: String = Deploy.NoDispatcherGiven) {
|
||||
|
||||
private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0
|
||||
|
|
@ -132,14 +130,6 @@ case class MaterializerSettings(
|
|||
def withFanOut(initialFanOutBufferSize: Int, maxFanOutBufferSize: Int): MaterializerSettings =
|
||||
copy(initialFanOutBufferSize = initialFanOutBufferSize, maxFanOutBufferSize = maxFanOutBufferSize)
|
||||
|
||||
def withSubscriptionTimeout(timeout: FiniteDuration): MaterializerSettings =
|
||||
copy(upstreamSubscriptionTimeout = timeout, downstreamSubscriptionTimeout = timeout)
|
||||
|
||||
def withSubscriptionTimeout(upstreamSubscriptionTimeout: FiniteDuration,
|
||||
downstreamSubscriptionTimeout: FiniteDuration): MaterializerSettings =
|
||||
copy(upstreamSubscriptionTimeout = upstreamSubscriptionTimeout,
|
||||
downstreamSubscriptionTimeout = downstreamSubscriptionTimeout)
|
||||
|
||||
def withDispatcher(dispatcher: String): MaterializerSettings = copy(dispatcher = dispatcher)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -158,8 +158,6 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi
|
|||
var pub: ActorPublisher[T] = _
|
||||
var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason
|
||||
|
||||
context.setReceiveTimeout(settings.downstreamSubscriptionTimeout)
|
||||
|
||||
final def receive = {
|
||||
case ExposedPublisher(pub) ⇒
|
||||
this.pub = pub.asInstanceOf[ActorPublisher[T]]
|
||||
|
|
@ -169,7 +167,6 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi
|
|||
final def waitingForSubscribers: Receive = {
|
||||
case SubscribePending ⇒
|
||||
pub.takePendingSubscribers() foreach registerSubscriber
|
||||
context.setReceiveTimeout(Duration.Undefined)
|
||||
context.become(active)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -53,7 +53,6 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS
|
|||
def receive = {
|
||||
case ExposedPublisher(publisher) ⇒
|
||||
exposedPublisher = publisher
|
||||
context.setReceiveTimeout(settings.downstreamSubscriptionTimeout)
|
||||
context.become(waitingForFirstSubscriber)
|
||||
case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher")
|
||||
}
|
||||
|
|
@ -61,7 +60,6 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS
|
|||
def waitingForFirstSubscriber: Receive = {
|
||||
case SubscribePending ⇒
|
||||
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
|
||||
context.setReceiveTimeout(Duration.Undefined)
|
||||
import context.dispatcher
|
||||
future.pipeTo(self)
|
||||
context.become(active)
|
||||
|
|
|
|||
|
|
@ -57,7 +57,6 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting
|
|||
def receive = {
|
||||
case ExposedPublisher(publisher) ⇒
|
||||
exposedPublisher = publisher
|
||||
context.setReceiveTimeout(settings.downstreamSubscriptionTimeout)
|
||||
context.become(waitingForFirstSubscriber)
|
||||
case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher")
|
||||
}
|
||||
|
|
@ -65,7 +64,6 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting
|
|||
def waitingForFirstSubscriber: Receive = {
|
||||
case SubscribePending ⇒
|
||||
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
|
||||
context.setReceiveTimeout(Duration.Undefined)
|
||||
context.become(active)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -56,7 +56,6 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
|
|||
def receive = {
|
||||
case ExposedPublisher(publisher) ⇒
|
||||
exposedPublisher = publisher
|
||||
context.setReceiveTimeout(settings.downstreamSubscriptionTimeout)
|
||||
context.become(waitingForFirstSubscriber)
|
||||
case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher")
|
||||
}
|
||||
|
|
@ -64,7 +63,6 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
|
|||
def waitingForFirstSubscriber: Receive = {
|
||||
case SubscribePending ⇒
|
||||
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
|
||||
context.setReceiveTimeout(Duration.Undefined)
|
||||
import context.dispatcher
|
||||
tickTask = Some(context.system.scheduler.schedule(initialDelay, interval, self, Tick))
|
||||
context.become(active)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue