From 9c1a00d02017a1786820d27a62496afb7e8285ce Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 8 Jan 2013 11:48:50 +0100 Subject: [PATCH] Propagate statusPromise down to the actual change in throttler, see #2877 * added some missing copyright headers --- .../src/main/scala/akka/remote/Endpoint.scala | 3 ++ .../scala/akka/remote/FailureDetector.scala | 3 ++ .../remote/PhiAccrualFailureDetector.scala | 3 ++ .../src/main/scala/akka/remote/Remoting.scala | 10 +++- .../scala/akka/remote/RemotingLifecycle.scala | 3 ++ .../transport/AbstractTransportAdapter.scala | 3 ++ .../akka/remote/transport/AkkaPduCodec.scala | 3 ++ .../transport/AkkaProtocolTransport.scala | 6 ++- .../FailureInjectorTransportAdapter.scala | 3 ++ .../akka/remote/transport/TestTransport.scala | 3 ++ .../transport/ThrottlerTransportAdapter.scala | 52 ++++++++++++------- .../akka/remote/transport/Transport.scala | 3 ++ .../remote/transport/netty/NettyHelpers.scala | 3 ++ .../transport/netty/NettyTransport.scala | 3 ++ .../remote/transport/netty/TcpSupport.scala | 3 ++ .../remote/transport/netty/UdpSupport.scala | 3 ++ 16 files changed, 85 insertions(+), 22 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 639084e812..2b90447517 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote import akka.{ OnlyCauseStackTrace, AkkaException } diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala index d3562b2afd..d4ac7b896f 100644 --- a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote import java.util.concurrent.TimeUnit._ diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index b0525b48d8..caff3fe387 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote import akka.remote.FailureDetector.Clock diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index be9ff83684..1653293269 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote import scala.language.postfixOps @@ -400,7 +403,12 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val accepting: Receive = { case ManagementCommand(cmd, statusPromise) ⇒ - transportMapping.values foreach { _.managementCommand(cmd, statusPromise) } + val allStatuses = transportMapping.values map { transport ⇒ + val p = Promise[Boolean]() + transport.managementCommand(cmd, p) + p.future + } + statusPromise completeWith Future.fold(allStatuses)(true)(_ && _) case s @ Send(message, senderOption, recipientRef) ⇒ val recipientAddress = recipientRef.path.address diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala index bd44ff3982..1c454f8101 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote import akka.event.{ LoggingAdapter, Logging } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index b46f89262b..1163e8cea0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport import scala.language.postfixOps diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index b6b17c68c7..3d1d68c437 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport import akka.AkkaException diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 09ac87b9fa..4799bfba76 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport import akka.{ OnlyCauseStackTrace, AkkaException } @@ -375,7 +378,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Disassociate ⇒ stop() - case Heartbeat ⇒ failureDetector.heartbeat(); stay() + case Heartbeat ⇒ + failureDetector.heartbeat(); stay() case Payload(payload) ⇒ stateData match { case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index 59be194dde..2846c72e02 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport import FailureInjectorTransportAdapter._ diff --git a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala index 200e038778..c9fb06e6bc 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport import TestTransport._ diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 82143f483a..f668953037 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -1,8 +1,12 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport import ThrottlerTransportAdapter._ import akka.actor._ import akka.pattern.pipe +import akka.remote.EndpointManager.ManagementCommand import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.ActorTransportAdapter.ListenUnderlying import akka.remote.transport.ActorTransportAdapter.ListenerRegistered @@ -17,6 +21,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable.Queue +import scala.concurrent.Future import scala.concurrent.Promise import scala.math.min import scala.util.{ Success, Failure } @@ -107,10 +112,8 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA protected def managerProps = Props(new ThrottlerManager(wrappedTransport)) override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match { - case s @ SetThrottle(_, _, _) ⇒ - manager ! s - statusPromise.success(true) - case _ ⇒ wrappedTransport.managementCommand(cmd, statusPromise) + case s: SetThrottle ⇒ manager ! ManagementCommand(s, statusPromise) + case _ ⇒ wrappedTransport.managementCommand(cmd, statusPromise) } } @@ -164,17 +167,22 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor handleTable ::= nakedAddress(naked) -> wrappedHandle statusPromise.success(wrappedHandle) - case s @ SetThrottle(address, direction, mode) ⇒ + case ManagementCommand(SetThrottle(address, direction, mode), statusPromise) ⇒ val naked = nakedAddress(address) throttlingModes += naked -> (mode, direction) - handleTable.foreach { - case (addr, handle) ⇒ - if (addr == naked) setMode(handle, mode, direction) + val allStatuses = handleTable.map { + case (`naked`, handle) ⇒ + val p = Promise[Boolean]() + setMode(handle, mode, direction, p) + p.future + case _ ⇒ Future.successful(true) } + statusPromise completeWith Future.fold(allStatuses)(true)(_ && _) + case Checkin(origin, handle) ⇒ val naked: Address = nakedAddress(origin) handleTable ::= naked -> handle - setMode(naked, handle) + setMode(naked, handle, Promise[Boolean]()) } @@ -192,16 +200,19 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A } } - private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = { + private def setMode(nakedAddress: Address, handle: ThrottlerHandle, statusPromise: Promise[Boolean]): Unit = { throttlingModes.get(nakedAddress) match { - case Some((mode, direction)) ⇒ setMode(handle, mode, direction) - case None ⇒ setMode(handle, Unthrottled, Direction.Both) + case Some((mode, direction)) ⇒ setMode(handle, mode, direction, statusPromise) + case None ⇒ setMode(handle, Unthrottled, Direction.Both, statusPromise) } } - private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = { - if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode - if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode) + private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction, + statusPromise: Promise[Boolean]): Unit = { + if (direction.includes(Direction.Send)) + handle.outboundThrottleMode.set(mode) + if (direction.includes(Direction.Receive)) + handle.throttlerActor ! ManagementCommand(mode, statusPromise) } private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = { @@ -286,9 +297,9 @@ private[transport] class ThrottledAssociation( case Event(InboundPayload(p), _) ⇒ throttledMessages = throttledMessages enqueue p stay() - case Event(mode: ThrottleMode, ExposedHandle(exposedHandle)) ⇒ + case Event(ManagementCommand(mode: ThrottleMode, statusPromise), ExposedHandle(exposedHandle)) ⇒ inboundThrottleMode = mode - if (inboundThrottleMode == Blackhole) { + try if (mode == Blackhole) { throttledMessages = Queue.empty[ByteString] exposedHandle.disassociate() stop() @@ -296,7 +307,7 @@ private[transport] class ThrottledAssociation( associationHandler notify InboundAssociation(exposedHandle) exposedHandle.readHandlerPromise.future pipeTo self goto(WaitUpstreamListener) - } + } finally statusPromise.success(true) } when(WaitUpstreamListener) { @@ -321,12 +332,13 @@ private[transport] class ThrottledAssociation( } when(Throttling) { - case Event(mode: ThrottleMode, _) ⇒ + case Event(ManagementCommand(mode: ThrottleMode, statusPromise), _) ⇒ inboundThrottleMode = mode - if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString] + if (mode == Blackhole) throttledMessages = Queue.empty[ByteString] cancelTimer(DequeueTimerName) if (throttledMessages.nonEmpty) scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length)) + statusPromise.success(true) stay() case Event(InboundPayload(p), _) ⇒ forwardOrDelay(p) diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index b2cdbcc54e..649597f4c7 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport import scala.concurrent.{ Promise, Future } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala index dec126cd6a..f4c4cd05f3 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport.netty import akka.AkkaException diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index abc475ab4b..e285f40e19 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport.netty import akka.{ OnlyCauseStackTrace, ConfigurationException } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 515637dad0..32e4025432 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport.netty import akka.actor.Address diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index c52e8d9bc9..d46c738b44 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote.transport.netty import akka.actor.Address