Revert "Propagate statusPromise down to the actual change in throttler, see #2877"

This reverts commit 9c1a00d020.
This commit is contained in:
Patrik Nordwall 2013-01-08 20:58:13 +01:00
parent 5d53ec0c52
commit dec4542f11
16 changed files with 22 additions and 85 deletions

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote package akka.remote
import akka.{ OnlyCauseStackTrace, AkkaException } import akka.{ OnlyCauseStackTrace, AkkaException }

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote package akka.remote
import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeUnit._

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote package akka.remote
import akka.remote.FailureDetector.Clock import akka.remote.FailureDetector.Clock

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote package akka.remote
import scala.language.postfixOps import scala.language.postfixOps
@ -403,12 +400,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
val accepting: Receive = { val accepting: Receive = {
case ManagementCommand(cmd, statusPromise) case ManagementCommand(cmd, statusPromise)
val allStatuses = transportMapping.values map { transport transportMapping.values foreach { _.managementCommand(cmd, statusPromise) }
val p = Promise[Boolean]()
transport.managementCommand(cmd, p)
p.future
}
statusPromise completeWith Future.fold(allStatuses)(true)(_ && _)
case s @ Send(message, senderOption, recipientRef) case s @ Send(message, senderOption, recipientRef)
val recipientAddress = recipientRef.path.address val recipientAddress = recipientRef.path.address

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote package akka.remote
import akka.event.{ LoggingAdapter, Logging } import akka.event.{ LoggingAdapter, Logging }

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport package akka.remote.transport
import scala.language.postfixOps import scala.language.postfixOps

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport package akka.remote.transport
import akka.AkkaException import akka.AkkaException

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport package akka.remote.transport
import akka.{ OnlyCauseStackTrace, AkkaException } import akka.{ OnlyCauseStackTrace, AkkaException }
@ -378,8 +375,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Disassociate case Disassociate
stop() stop()
case Heartbeat case Heartbeat failureDetector.heartbeat(); stay()
failureDetector.heartbeat(); stay()
case Payload(payload) stateData match { case Payload(payload) stateData match {
case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue) case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport package akka.remote.transport
import FailureInjectorTransportAdapter._ import FailureInjectorTransportAdapter._

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport package akka.remote.transport
import TestTransport._ import TestTransport._

View file

@ -1,12 +1,8 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport package akka.remote.transport
import ThrottlerTransportAdapter._ import ThrottlerTransportAdapter._
import akka.actor._ import akka.actor._
import akka.pattern.pipe import akka.pattern.pipe
import akka.remote.EndpointManager.ManagementCommand
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenUnderlying import akka.remote.transport.ActorTransportAdapter.ListenUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenerRegistered import akka.remote.transport.ActorTransportAdapter.ListenerRegistered
@ -21,7 +17,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable.Queue import scala.collection.immutable.Queue
import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.math.min import scala.math.min
import scala.util.{ Success, Failure } import scala.util.{ Success, Failure }
@ -112,8 +107,10 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
protected def managerProps = Props(new ThrottlerManager(wrappedTransport)) protected def managerProps = Props(new ThrottlerManager(wrappedTransport))
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match { override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match {
case s: SetThrottle manager ! ManagementCommand(s, statusPromise) case s @ SetThrottle(_, _, _)
case _ wrappedTransport.managementCommand(cmd, statusPromise) manager ! s
statusPromise.success(true)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
} }
} }
@ -167,22 +164,17 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
handleTable ::= nakedAddress(naked) -> wrappedHandle handleTable ::= nakedAddress(naked) -> wrappedHandle
statusPromise.success(wrappedHandle) statusPromise.success(wrappedHandle)
case ManagementCommand(SetThrottle(address, direction, mode), statusPromise) case s @ SetThrottle(address, direction, mode)
val naked = nakedAddress(address) val naked = nakedAddress(address)
throttlingModes += naked -> (mode, direction) throttlingModes += naked -> (mode, direction)
val allStatuses = handleTable.map { handleTable.foreach {
case (`naked`, handle) case (addr, handle)
val p = Promise[Boolean]() if (addr == naked) setMode(handle, mode, direction)
setMode(handle, mode, direction, p)
p.future
case _ Future.successful(true)
} }
statusPromise completeWith Future.fold(allStatuses)(true)(_ && _)
case Checkin(origin, handle) case Checkin(origin, handle)
val naked: Address = nakedAddress(origin) val naked: Address = nakedAddress(origin)
handleTable ::= naked -> handle handleTable ::= naked -> handle
setMode(naked, handle, Promise[Boolean]()) setMode(naked, handle)
} }
@ -200,19 +192,16 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
} }
} }
private def setMode(nakedAddress: Address, handle: ThrottlerHandle, statusPromise: Promise[Boolean]): Unit = { private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = {
throttlingModes.get(nakedAddress) match { throttlingModes.get(nakedAddress) match {
case Some((mode, direction)) setMode(handle, mode, direction, statusPromise) case Some((mode, direction)) setMode(handle, mode, direction)
case None setMode(handle, Unthrottled, Direction.Both, statusPromise) case None setMode(handle, Unthrottled, Direction.Both)
} }
} }
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction, private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = {
statusPromise: Promise[Boolean]): Unit = { if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode
if (direction.includes(Direction.Send)) if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode)
handle.outboundThrottleMode.set(mode)
if (direction.includes(Direction.Receive))
handle.throttlerActor ! ManagementCommand(mode, statusPromise)
} }
private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = { private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = {
@ -297,9 +286,9 @@ private[transport] class ThrottledAssociation(
case Event(InboundPayload(p), _) case Event(InboundPayload(p), _)
throttledMessages = throttledMessages enqueue p throttledMessages = throttledMessages enqueue p
stay() stay()
case Event(ManagementCommand(mode: ThrottleMode, statusPromise), ExposedHandle(exposedHandle)) case Event(mode: ThrottleMode, ExposedHandle(exposedHandle))
inboundThrottleMode = mode inboundThrottleMode = mode
try if (mode == Blackhole) { if (inboundThrottleMode == Blackhole) {
throttledMessages = Queue.empty[ByteString] throttledMessages = Queue.empty[ByteString]
exposedHandle.disassociate() exposedHandle.disassociate()
stop() stop()
@ -307,7 +296,7 @@ private[transport] class ThrottledAssociation(
associationHandler notify InboundAssociation(exposedHandle) associationHandler notify InboundAssociation(exposedHandle)
exposedHandle.readHandlerPromise.future pipeTo self exposedHandle.readHandlerPromise.future pipeTo self
goto(WaitUpstreamListener) goto(WaitUpstreamListener)
} finally statusPromise.success(true) }
} }
when(WaitUpstreamListener) { when(WaitUpstreamListener) {
@ -332,13 +321,12 @@ private[transport] class ThrottledAssociation(
} }
when(Throttling) { when(Throttling) {
case Event(ManagementCommand(mode: ThrottleMode, statusPromise), _) case Event(mode: ThrottleMode, _)
inboundThrottleMode = mode inboundThrottleMode = mode
if (mode == Blackhole) throttledMessages = Queue.empty[ByteString] if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString]
cancelTimer(DequeueTimerName) cancelTimer(DequeueTimerName)
if (throttledMessages.nonEmpty) if (throttledMessages.nonEmpty)
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length)) scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
statusPromise.success(true)
stay() stay()
case Event(InboundPayload(p), _) case Event(InboundPayload(p), _)
forwardOrDelay(p) forwardOrDelay(p)

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport package akka.remote.transport
import scala.concurrent.{ Promise, Future } import scala.concurrent.{ Promise, Future }

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty package akka.remote.transport.netty
import akka.AkkaException import akka.AkkaException

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty package akka.remote.transport.netty
import akka.{ OnlyCauseStackTrace, ConfigurationException } import akka.{ OnlyCauseStackTrace, ConfigurationException }

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty package akka.remote.transport.netty
import akka.actor.Address import akka.actor.Address

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty package akka.remote.transport.netty
import akka.actor.Address import akka.actor.Address