Propagate statusPromise down to the actual change in throttler, see #2877

* added some missing copyright headers
This commit is contained in:
Patrik Nordwall 2013-01-08 11:48:50 +01:00
parent 7944b456fc
commit 9c1a00d020
16 changed files with 85 additions and 22 deletions

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.{ OnlyCauseStackTrace, AkkaException }

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import java.util.concurrent.TimeUnit._

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.remote.FailureDetector.Clock

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import scala.language.postfixOps
@ -400,7 +403,12 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
val accepting: Receive = {
case ManagementCommand(cmd, statusPromise)
transportMapping.values foreach { _.managementCommand(cmd, statusPromise) }
val allStatuses = transportMapping.values map { transport
val p = Promise[Boolean]()
transport.managementCommand(cmd, p)
p.future
}
statusPromise completeWith Future.fold(allStatuses)(true)(_ && _)
case s @ Send(message, senderOption, recipientRef)
val recipientAddress = recipientRef.path.address

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.event.{ LoggingAdapter, Logging }

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import scala.language.postfixOps

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import akka.AkkaException

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import akka.{ OnlyCauseStackTrace, AkkaException }
@ -375,7 +378,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Disassociate
stop()
case Heartbeat failureDetector.heartbeat(); stay()
case Heartbeat
failureDetector.heartbeat(); stay()
case Payload(payload) stateData match {
case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import FailureInjectorTransportAdapter._

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import TestTransport._

View file

@ -1,8 +1,12 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import ThrottlerTransportAdapter._
import akka.actor._
import akka.pattern.pipe
import akka.remote.EndpointManager.ManagementCommand
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenerRegistered
@ -17,6 +21,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.math.min
import scala.util.{ Success, Failure }
@ -107,10 +112,8 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
protected def managerProps = Props(new ThrottlerManager(wrappedTransport))
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match {
case s @ SetThrottle(_, _, _)
manager ! s
statusPromise.success(true)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
case s: SetThrottle manager ! ManagementCommand(s, statusPromise)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
}
}
@ -164,17 +167,22 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
handleTable ::= nakedAddress(naked) -> wrappedHandle
statusPromise.success(wrappedHandle)
case s @ SetThrottle(address, direction, mode)
case ManagementCommand(SetThrottle(address, direction, mode), statusPromise)
val naked = nakedAddress(address)
throttlingModes += naked -> (mode, direction)
handleTable.foreach {
case (addr, handle)
if (addr == naked) setMode(handle, mode, direction)
val allStatuses = handleTable.map {
case (`naked`, handle)
val p = Promise[Boolean]()
setMode(handle, mode, direction, p)
p.future
case _ Future.successful(true)
}
statusPromise completeWith Future.fold(allStatuses)(true)(_ && _)
case Checkin(origin, handle)
val naked: Address = nakedAddress(origin)
handleTable ::= naked -> handle
setMode(naked, handle)
setMode(naked, handle, Promise[Boolean]())
}
@ -192,16 +200,19 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
}
}
private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = {
private def setMode(nakedAddress: Address, handle: ThrottlerHandle, statusPromise: Promise[Boolean]): Unit = {
throttlingModes.get(nakedAddress) match {
case Some((mode, direction)) setMode(handle, mode, direction)
case None setMode(handle, Unthrottled, Direction.Both)
case Some((mode, direction)) setMode(handle, mode, direction, statusPromise)
case None setMode(handle, Unthrottled, Direction.Both, statusPromise)
}
}
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = {
if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode
if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode)
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction,
statusPromise: Promise[Boolean]): Unit = {
if (direction.includes(Direction.Send))
handle.outboundThrottleMode.set(mode)
if (direction.includes(Direction.Receive))
handle.throttlerActor ! ManagementCommand(mode, statusPromise)
}
private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = {
@ -286,9 +297,9 @@ private[transport] class ThrottledAssociation(
case Event(InboundPayload(p), _)
throttledMessages = throttledMessages enqueue p
stay()
case Event(mode: ThrottleMode, ExposedHandle(exposedHandle))
case Event(ManagementCommand(mode: ThrottleMode, statusPromise), ExposedHandle(exposedHandle))
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) {
try if (mode == Blackhole) {
throttledMessages = Queue.empty[ByteString]
exposedHandle.disassociate()
stop()
@ -296,7 +307,7 @@ private[transport] class ThrottledAssociation(
associationHandler notify InboundAssociation(exposedHandle)
exposedHandle.readHandlerPromise.future pipeTo self
goto(WaitUpstreamListener)
}
} finally statusPromise.success(true)
}
when(WaitUpstreamListener) {
@ -321,12 +332,13 @@ private[transport] class ThrottledAssociation(
}
when(Throttling) {
case Event(mode: ThrottleMode, _)
case Event(ManagementCommand(mode: ThrottleMode, statusPromise), _)
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString]
if (mode == Blackhole) throttledMessages = Queue.empty[ByteString]
cancelTimer(DequeueTimerName)
if (throttledMessages.nonEmpty)
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
statusPromise.success(true)
stay()
case Event(InboundPayload(p), _)
forwardOrDelay(p)

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import scala.concurrent.{ Promise, Future }

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.AkkaException

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.{ OnlyCauseStackTrace, ConfigurationException }

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.actor.Address

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.actor.Address