From dec4542f11a1d7d73d073594f41fb6c3e93c8126 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 8 Jan 2013 20:58:13 +0100 Subject: [PATCH] Revert "Propagate statusPromise down to the actual change in throttler, see #2877" This reverts commit 9c1a00d02017a1786820d27a62496afb7e8285ce. --- .../src/main/scala/akka/remote/Endpoint.scala | 3 -- .../scala/akka/remote/FailureDetector.scala | 3 -- .../remote/PhiAccrualFailureDetector.scala | 3 -- .../src/main/scala/akka/remote/Remoting.scala | 10 +--- .../scala/akka/remote/RemotingLifecycle.scala | 3 -- .../transport/AbstractTransportAdapter.scala | 3 -- .../akka/remote/transport/AkkaPduCodec.scala | 3 -- .../transport/AkkaProtocolTransport.scala | 6 +-- .../FailureInjectorTransportAdapter.scala | 3 -- .../akka/remote/transport/TestTransport.scala | 3 -- .../transport/ThrottlerTransportAdapter.scala | 52 +++++++------------ .../akka/remote/transport/Transport.scala | 3 -- .../remote/transport/netty/NettyHelpers.scala | 3 -- .../transport/netty/NettyTransport.scala | 3 -- .../remote/transport/netty/TcpSupport.scala | 3 -- .../remote/transport/netty/UdpSupport.scala | 3 -- 16 files changed, 22 insertions(+), 85 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 2b90447517..639084e812 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote import akka.{ OnlyCauseStackTrace, AkkaException } diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala index d4ac7b896f..d3562b2afd 100644 --- a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote import java.util.concurrent.TimeUnit._ diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index caff3fe387..b0525b48d8 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote import akka.remote.FailureDetector.Clock diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 1653293269..be9ff83684 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote import scala.language.postfixOps @@ -403,12 +400,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val accepting: Receive = { case ManagementCommand(cmd, statusPromise) ⇒ - val allStatuses = transportMapping.values map { transport ⇒ - val p = Promise[Boolean]() - transport.managementCommand(cmd, p) - p.future - } - statusPromise completeWith Future.fold(allStatuses)(true)(_ && _) + transportMapping.values foreach { _.managementCommand(cmd, statusPromise) } case s @ Send(message, senderOption, recipientRef) ⇒ val recipientAddress = recipientRef.path.address diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala index 1c454f8101..bd44ff3982 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote import akka.event.{ LoggingAdapter, Logging } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 1163e8cea0..b46f89262b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport import scala.language.postfixOps diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index 3d1d68c437..b6b17c68c7 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport import akka.AkkaException diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 4799bfba76..09ac87b9fa 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport import akka.{ OnlyCauseStackTrace, AkkaException } @@ -378,8 +375,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Disassociate ⇒ stop() - case Heartbeat ⇒ - failureDetector.heartbeat(); stay() + case Heartbeat ⇒ failureDetector.heartbeat(); stay() case Payload(payload) ⇒ stateData match { case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index 2846c72e02..59be194dde 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport import FailureInjectorTransportAdapter._ diff --git a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala index c9fb06e6bc..200e038778 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport import TestTransport._ diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index f668953037..82143f483a 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -1,12 +1,8 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport import ThrottlerTransportAdapter._ import akka.actor._ import akka.pattern.pipe -import akka.remote.EndpointManager.ManagementCommand import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.ActorTransportAdapter.ListenUnderlying import akka.remote.transport.ActorTransportAdapter.ListenerRegistered @@ -21,7 +17,6 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable.Queue -import scala.concurrent.Future import scala.concurrent.Promise import scala.math.min import scala.util.{ Success, Failure } @@ -112,8 +107,10 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA protected def managerProps = Props(new ThrottlerManager(wrappedTransport)) override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match { - case s: SetThrottle ⇒ manager ! ManagementCommand(s, statusPromise) - case _ ⇒ wrappedTransport.managementCommand(cmd, statusPromise) + case s @ SetThrottle(_, _, _) ⇒ + manager ! s + statusPromise.success(true) + case _ ⇒ wrappedTransport.managementCommand(cmd, statusPromise) } } @@ -167,22 +164,17 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor handleTable ::= nakedAddress(naked) -> wrappedHandle statusPromise.success(wrappedHandle) - case ManagementCommand(SetThrottle(address, direction, mode), statusPromise) ⇒ + case s @ SetThrottle(address, direction, mode) ⇒ val naked = nakedAddress(address) throttlingModes += naked -> (mode, direction) - val allStatuses = handleTable.map { - case (`naked`, handle) ⇒ - val p = Promise[Boolean]() - setMode(handle, mode, direction, p) - p.future - case _ ⇒ Future.successful(true) + handleTable.foreach { + case (addr, handle) ⇒ + if (addr == naked) setMode(handle, mode, direction) } - statusPromise completeWith Future.fold(allStatuses)(true)(_ && _) - case Checkin(origin, handle) ⇒ val naked: Address = nakedAddress(origin) handleTable ::= naked -> handle - setMode(naked, handle, Promise[Boolean]()) + setMode(naked, handle) } @@ -200,19 +192,16 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A } } - private def setMode(nakedAddress: Address, handle: ThrottlerHandle, statusPromise: Promise[Boolean]): Unit = { + private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = { throttlingModes.get(nakedAddress) match { - case Some((mode, direction)) ⇒ setMode(handle, mode, direction, statusPromise) - case None ⇒ setMode(handle, Unthrottled, Direction.Both, statusPromise) + case Some((mode, direction)) ⇒ setMode(handle, mode, direction) + case None ⇒ setMode(handle, Unthrottled, Direction.Both) } } - private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction, - statusPromise: Promise[Boolean]): Unit = { - if (direction.includes(Direction.Send)) - handle.outboundThrottleMode.set(mode) - if (direction.includes(Direction.Receive)) - handle.throttlerActor ! ManagementCommand(mode, statusPromise) + private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = { + if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode + if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode) } private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = { @@ -297,9 +286,9 @@ private[transport] class ThrottledAssociation( case Event(InboundPayload(p), _) ⇒ throttledMessages = throttledMessages enqueue p stay() - case Event(ManagementCommand(mode: ThrottleMode, statusPromise), ExposedHandle(exposedHandle)) ⇒ + case Event(mode: ThrottleMode, ExposedHandle(exposedHandle)) ⇒ inboundThrottleMode = mode - try if (mode == Blackhole) { + if (inboundThrottleMode == Blackhole) { throttledMessages = Queue.empty[ByteString] exposedHandle.disassociate() stop() @@ -307,7 +296,7 @@ private[transport] class ThrottledAssociation( associationHandler notify InboundAssociation(exposedHandle) exposedHandle.readHandlerPromise.future pipeTo self goto(WaitUpstreamListener) - } finally statusPromise.success(true) + } } when(WaitUpstreamListener) { @@ -332,13 +321,12 @@ private[transport] class ThrottledAssociation( } when(Throttling) { - case Event(ManagementCommand(mode: ThrottleMode, statusPromise), _) ⇒ + case Event(mode: ThrottleMode, _) ⇒ inboundThrottleMode = mode - if (mode == Blackhole) throttledMessages = Queue.empty[ByteString] + if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString] cancelTimer(DequeueTimerName) if (throttledMessages.nonEmpty) scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length)) - statusPromise.success(true) stay() case Event(InboundPayload(p), _) ⇒ forwardOrDelay(p) diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 649597f4c7..b2cdbcc54e 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport import scala.concurrent.{ Promise, Future } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala index f4c4cd05f3..dec126cd6a 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport.netty import akka.AkkaException diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 001b7a35f0..361337242a 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport.netty import akka.{ OnlyCauseStackTrace, ConfigurationException } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 32e4025432..515637dad0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport.netty import akka.actor.Address diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index d46c738b44..c52e8d9bc9 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -1,6 +1,3 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ package akka.remote.transport.netty import akka.actor.Address