Confirmation of throttle mode change, see #2877

* added some missing copyright headers
* Use ask inside ThrottlerTransportAdapter
* Change signature of managementCommand
This commit is contained in:
Patrik Nordwall 2013-01-09 12:21:31 +01:00
parent f96da34792
commit 265eaef1f6
17 changed files with 117 additions and 47 deletions

View file

@ -109,6 +109,9 @@ akka {
# FIXME document
use-passive-connections = on
# Acknowledgment timeout for commands
command-ack-timeout = 30 s
adapters {
gremlin = "akka.remote.transport.FailureInjectorProvider"
trttl = "akka.remote.transport.ThrottlerProvider"

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.{ OnlyCauseStackTrace, AkkaException }

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import java.util.concurrent.TimeUnit._

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.remote.FailureDetector.Clock

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import scala.language.postfixOps
@ -27,11 +30,12 @@ class RemotingSettings(val config: Config) {
val LogLifecycleEvents: Boolean = getBoolean("akka.remoting.log-remote-lifecycle-events")
val ShutdownTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.shutdown-timeout"), MILLISECONDS)
val ShutdownTimeout: Timeout =
Duration(getMilliseconds("akka.remoting.shutdown-timeout"), MILLISECONDS)
val FlushWait: FiniteDuration = Duration(getMilliseconds("akka.remoting.flush-wait-on-shutdown"), MILLISECONDS)
val StartupTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS)
val StartupTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS))
val RetryGateClosedFor: Long = getNanoseconds("akka.remoting.retry-gate-closed-for")
@ -41,8 +45,10 @@ class RemotingSettings(val config: Config) {
val RetryWindow: FiniteDuration = Duration(getMilliseconds("akka.remoting.retry-window"), MILLISECONDS)
val BackoffPeriod: FiniteDuration =
Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS)
val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS)
val CommandAckTimeout: Timeout =
Timeout(Duration(getMilliseconds("akka.remoting.command-ack-timeout"), MILLISECONDS))
val Transports: Seq[(String, Seq[String], Config)] = transportNames.map { name
val transportConfig = transportConfigFor(name)
@ -146,7 +152,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
import scala.concurrent.ExecutionContext.Implicits.global
endpointManager match {
case Some(manager)
implicit val timeout = new Timeout(settings.ShutdownTimeout)
implicit val timeout = settings.ShutdownTimeout
val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean]
def finalize(): Unit = {
@ -182,18 +188,17 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
Props(new EndpointManager(provider.remoteSettings.config, log)), Remoting.EndpointManagerName)
endpointManager = Some(manager)
implicit val timeout = new Timeout(settings.StartupTimeout)
try {
val addressesPromise: Promise[Seq[(Transport, Address)]] = Promise()
manager ! Listen(addressesPromise)
val transports: Seq[(Transport, Address)] = Await.result(addressesPromise.future, timeout.duration)
val transports: Seq[(Transport, Address)] = Await.result(addressesPromise.future,
settings.StartupTimeout.duration)
if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)
transportMapping = transports.groupBy { case (transport, _) transport.schemeIdentifier }.mapValues {
_.toSet
}
transportMapping = transports.groupBy {
case (transport, _) transport.schemeIdentifier
} map { case (k, v) k -> v.toSet }
defaultAddress = transports.head._2
addresses = transports.map { _._2 }.toSet
@ -229,9 +234,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
override def managementCommand(cmd: Any): Future[Boolean] = endpointManager match {
case Some(manager)
val statusPromise = Promise[Boolean]()
manager.tell(ManagementCommand(cmd, statusPromise), sender = Actor.noSender)
statusPromise.future
import system.dispatcher
implicit val timeout = settings.CommandAckTimeout
manager ? ManagementCommand(cmd) map { case ManagementCommandAck(status) status }
case None throw new IllegalStateException("Attempted to send management command but Remoting is not running.")
}
@ -253,7 +258,8 @@ private[remote] object EndpointManager {
case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand {
override def toString = s"Remote message $senderOption -> $recipient"
}
case class ManagementCommand(cmd: Any, statusPromise: Promise[Boolean]) extends RemotingCommand
case class ManagementCommand(cmd: Any) extends RemotingCommand
case class ManagementCommandAck(status: Boolean)
// Messages internal to EndpointManager
case object Prune
@ -389,8 +395,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
transport -> address
}
addressesPromise.success(transportsAndAddresses)
case ManagementCommand(_, statusPromise)
statusPromise.success(false)
case ManagementCommand(_)
sender ! ManagementCommandAck(false)
case StartupFinished
context.become(accepting)
case ShutdownAndFlush
@ -399,8 +405,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}
val accepting: Receive = {
case ManagementCommand(cmd, statusPromise)
transportMapping.values foreach { _.managementCommand(cmd, statusPromise) }
case ManagementCommand(cmd)
val allStatuses = transportMapping.values map { transport
transport.managementCommand(cmd)
}
Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender
case s @ Send(message, senderOption, recipientRef)
val recipientAddress = recipientRef.path.address

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.event.{ LoggingAdapter, Logging }

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import scala.language.postfixOps
@ -118,6 +121,8 @@ object ActorTransportAdapter {
case class ListenUnderlying(listenAddress: Address,
upstreamListener: Future[AssociationEventListener]) extends TransportOperation
case object DisassociateUnderlying extends TransportOperation
implicit val AskTimeout = Timeout(5 seconds)
}
abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem)
@ -125,8 +130,6 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS
import ActorTransportAdapter._
private implicit val timeout = new Timeout(3 seconds)
protected def managerName: String
protected def managerProps: Props
// Write once variable initialized when Listen is called.

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import akka.AkkaException

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import akka.{ OnlyCauseStackTrace, AkkaException }
@ -86,8 +89,7 @@ private[remote] class AkkaProtocolTransport(
override val addedSchemeIdentifier: String = AkkaScheme
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit =
wrappedTransport.managementCommand(cmd, statusPromise)
override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd)
override val maximumOverhead: Int = AkkaProtocolTransport.AkkaOverhead
protected def managerName = s"akkaprotocolmanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}"
@ -354,7 +356,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Disassociate
stop()
case Heartbeat failureDetector.heartbeat(); stay()
case Heartbeat
failureDetector.heartbeat(); stay()
case Payload(payload) stateData match {
case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import FailureInjectorTransportAdapter._
@ -48,15 +51,15 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
override val addedSchemeIdentifier = FailureInjectorSchemeIdentifier
protected def maximumOverhead = 0
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match {
override def managementCommand(cmd: Any): Future[Boolean] = cmd match {
case All(mode)
allMode = mode
statusPromise.success(true)
Future.successful(true)
case One(address, mode)
// don't care about the protocol part - we are injected in the stack anyway!
addressChaosTable.put(address.copy(protocol = "", system = ""), mode)
statusPromise.success(true)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
Future.successful(true)
case _ wrappedTransport.managementCommand(cmd)
}
protected def interceptListen(listenAddress: Address,

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import TestTransport._

View file

@ -1,20 +1,24 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import ThrottlerTransportAdapter._
import akka.actor._
import akka.pattern.ask
import akka.pattern.pipe
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.AkkaPduCodec.Associate
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener }
import akka.remote.transport.ThrottledAssociation._
import akka.remote.transport.ThrottlerManager.Checkin
import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle
import akka.remote.transport.ThrottlerTransportAdapter._
import akka.remote.transport.Transport._
import akka.util.ByteString
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.math.min
import scala.util.{ Success, Failure }
@ -54,7 +58,9 @@ object ThrottlerTransportAdapter {
}
}
object SetThrottle
case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode)
case object SetThrottleAck
sealed trait ThrottleMode {
def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean)
@ -107,11 +113,11 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
Props(new ThrottlerManager(wt))
}
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match {
case s @ SetThrottle(_, _, _)
manager ! s
statusPromise.success(true)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
override def managementCommand(cmd: Any): Future[Boolean] = cmd match {
case s: SetThrottle
import ActorTransportAdapter.AskTimeout
manager ? s map { case SetThrottleAck true }
case _ wrappedTransport.managementCommand(cmd)
}
}
@ -150,13 +156,17 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
handleTable ::= nakedAddress(naked) -> wrappedHandle
statusPromise.success(wrappedHandle)
case s @ SetThrottle(address, direction, mode)
case SetThrottle(address, direction, mode)
val naked = nakedAddress(address)
throttlingModes += naked -> (mode, direction)
handleTable.foreach {
case (addr, handle)
if (addr == naked) setMode(handle, mode, direction)
val ok = Future.successful(SetThrottleAck)
val allAcks = handleTable.map {
case (`naked`, handle) setMode(handle, mode, direction)
case _ ok
}
Future.sequence(allAcks).map(_ SetThrottleAck) pipeTo sender
case Checkin(origin, handle)
val naked: Address = nakedAddress(origin)
handleTable ::= naked -> handle
@ -178,16 +188,21 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
}
}
private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = {
private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Future[SetThrottleAck.type] = {
throttlingModes.get(nakedAddress) match {
case Some((mode, direction)) setMode(handle, mode, direction)
case None setMode(handle, Unthrottled, Direction.Both)
}
}
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = {
if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode
if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode)
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Future[SetThrottleAck.type] = {
import ActorTransportAdapter.AskTimeout
if (direction.includes(Direction.Send))
handle.outboundThrottleMode.set(mode)
if (direction.includes(Direction.Receive))
(handle.throttlerActor ? mode).mapTo[SetThrottleAck.type]
else
Future.successful(SetThrottleAck)
}
private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = {
@ -274,7 +289,7 @@ private[transport] class ThrottledAssociation(
stay()
case Event(mode: ThrottleMode, ExposedHandle(exposedHandle))
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) {
try if (mode == Blackhole) {
throttledMessages = Queue.empty[ByteString]
exposedHandle.disassociate()
stop()
@ -282,7 +297,7 @@ private[transport] class ThrottledAssociation(
associationHandler notify InboundAssociation(exposedHandle)
exposedHandle.readHandlerPromise.future pipeTo self
goto(WaitUpstreamListener)
}
} finally sender ! SetThrottleAck
}
when(WaitUpstreamListener) {
@ -309,10 +324,11 @@ private[transport] class ThrottledAssociation(
when(Throttling) {
case Event(mode: ThrottleMode, _)
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString]
if (mode == Blackhole) throttledMessages = Queue.empty[ByteString]
cancelTimer(DequeueTimerName)
if (throttledMessages.nonEmpty)
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
sender ! SetThrottleAck
stay()
case Event(InboundPayload(p), _)
forwardOrDelay(p)

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import scala.concurrent.{ Promise, Future }
@ -129,7 +132,7 @@ trait Transport {
* @param cmd Command message to the transport
* @return Future that succeeds when the command was handled or dropped
*/
def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = { statusPromise.success(false) }
def managementCommand(cmd: Any): Future[Boolean] = { Future.successful(false) }
}

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.AkkaException

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.{ OnlyCauseStackTrace, ConfigurationException }

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.actor.Address

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.actor.Address