diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 0d23116770..505ee2a030 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -109,6 +109,9 @@ akka { # FIXME document use-passive-connections = on + # Acknowledgment timeout for commands + command-ack-timeout = 30 s + adapters { gremlin = "akka.remote.transport.FailureInjectorProvider" trttl = "akka.remote.transport.ThrottlerProvider" diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 639084e812..798819a4d4 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote import akka.{ OnlyCauseStackTrace, AkkaException } diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala index d3562b2afd..5c7b7ac9c0 100644 --- a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote import java.util.concurrent.TimeUnit._ diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index b0525b48d8..194e3b57be 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote import akka.remote.FailureDetector.Clock diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index be9ff83684..0002257ac3 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote import scala.language.postfixOps @@ -27,11 +30,12 @@ class RemotingSettings(val config: Config) { val LogLifecycleEvents: Boolean = getBoolean("akka.remoting.log-remote-lifecycle-events") - val ShutdownTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.shutdown-timeout"), MILLISECONDS) + val ShutdownTimeout: Timeout = + Duration(getMilliseconds("akka.remoting.shutdown-timeout"), MILLISECONDS) val FlushWait: FiniteDuration = Duration(getMilliseconds("akka.remoting.flush-wait-on-shutdown"), MILLISECONDS) - val StartupTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS) + val StartupTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS)) val RetryGateClosedFor: Long = getNanoseconds("akka.remoting.retry-gate-closed-for") @@ -41,8 +45,10 @@ class RemotingSettings(val config: Config) { val RetryWindow: FiniteDuration = Duration(getMilliseconds("akka.remoting.retry-window"), MILLISECONDS) - val BackoffPeriod: FiniteDuration = - Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS) + val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS) + + val CommandAckTimeout: Timeout = + Timeout(Duration(getMilliseconds("akka.remoting.command-ack-timeout"), MILLISECONDS)) val Transports: Seq[(String, Seq[String], Config)] = transportNames.map { name ⇒ val transportConfig = transportConfigFor(name) @@ -146,7 +152,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc import scala.concurrent.ExecutionContext.Implicits.global endpointManager match { case Some(manager) ⇒ - implicit val timeout = new Timeout(settings.ShutdownTimeout) + implicit val timeout = settings.ShutdownTimeout val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean] def finalize(): Unit = { @@ -182,18 +188,17 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc Props(new EndpointManager(provider.remoteSettings.config, log)), Remoting.EndpointManagerName) endpointManager = Some(manager) - implicit val timeout = new Timeout(settings.StartupTimeout) - try { val addressesPromise: Promise[Seq[(Transport, Address)]] = Promise() manager ! Listen(addressesPromise) - val transports: Seq[(Transport, Address)] = Await.result(addressesPromise.future, timeout.duration) + val transports: Seq[(Transport, Address)] = Await.result(addressesPromise.future, + settings.StartupTimeout.duration) if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null) - transportMapping = transports.groupBy { case (transport, _) ⇒ transport.schemeIdentifier }.mapValues { - _.toSet - } + transportMapping = transports.groupBy { + case (transport, _) ⇒ transport.schemeIdentifier + } map { case (k, v) ⇒ k -> v.toSet } defaultAddress = transports.head._2 addresses = transports.map { _._2 }.toSet @@ -229,9 +234,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc override def managementCommand(cmd: Any): Future[Boolean] = endpointManager match { case Some(manager) ⇒ - val statusPromise = Promise[Boolean]() - manager.tell(ManagementCommand(cmd, statusPromise), sender = Actor.noSender) - statusPromise.future + import system.dispatcher + implicit val timeout = settings.CommandAckTimeout + manager ? ManagementCommand(cmd) map { case ManagementCommandAck(status) ⇒ status } case None ⇒ throw new IllegalStateException("Attempted to send management command but Remoting is not running.") } @@ -253,7 +258,8 @@ private[remote] object EndpointManager { case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand { override def toString = s"Remote message $senderOption -> $recipient" } - case class ManagementCommand(cmd: Any, statusPromise: Promise[Boolean]) extends RemotingCommand + case class ManagementCommand(cmd: Any) extends RemotingCommand + case class ManagementCommandAck(status: Boolean) // Messages internal to EndpointManager case object Prune @@ -389,8 +395,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport -> address } addressesPromise.success(transportsAndAddresses) - case ManagementCommand(_, statusPromise) ⇒ - statusPromise.success(false) + case ManagementCommand(_) ⇒ + sender ! ManagementCommandAck(false) case StartupFinished ⇒ context.become(accepting) case ShutdownAndFlush ⇒ @@ -399,8 +405,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } val accepting: Receive = { - case ManagementCommand(cmd, statusPromise) ⇒ - transportMapping.values foreach { _.managementCommand(cmd, statusPromise) } + case ManagementCommand(cmd) ⇒ + val allStatuses = transportMapping.values map { transport ⇒ + transport.managementCommand(cmd) + } + Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender case s @ Send(message, senderOption, recipientRef) ⇒ val recipientAddress = recipientRef.path.address diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala index bd44ff3982..39977afafd 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote import akka.event.{ LoggingAdapter, Logging } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 3dae6d479b..5643951ea4 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport import scala.language.postfixOps @@ -118,6 +121,8 @@ object ActorTransportAdapter { case class ListenUnderlying(listenAddress: Address, upstreamListener: Future[AssociationEventListener]) extends TransportOperation case object DisassociateUnderlying extends TransportOperation + + implicit val AskTimeout = Timeout(5 seconds) } abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem) @@ -125,8 +130,6 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS import ActorTransportAdapter._ - private implicit val timeout = new Timeout(3 seconds) - protected def managerName: String protected def managerProps: Props // Write once variable initialized when Listen is called. diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index b6b17c68c7..26d0028c60 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport import akka.AkkaException diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index c99cbcb398..921c88a83d 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport import akka.{ OnlyCauseStackTrace, AkkaException } @@ -86,8 +89,7 @@ private[remote] class AkkaProtocolTransport( override val addedSchemeIdentifier: String = AkkaScheme - override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = - wrappedTransport.managementCommand(cmd, statusPromise) + override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd) override val maximumOverhead: Int = AkkaProtocolTransport.AkkaOverhead protected def managerName = s"akkaprotocolmanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}" @@ -354,7 +356,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Disassociate ⇒ stop() - case Heartbeat ⇒ failureDetector.heartbeat(); stay() + case Heartbeat ⇒ + failureDetector.heartbeat(); stay() case Payload(payload) ⇒ stateData match { case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index 59be194dde..e3f204115b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport import FailureInjectorTransportAdapter._ @@ -48,15 +51,15 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor override val addedSchemeIdentifier = FailureInjectorSchemeIdentifier protected def maximumOverhead = 0 - override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match { + override def managementCommand(cmd: Any): Future[Boolean] = cmd match { case All(mode) ⇒ allMode = mode - statusPromise.success(true) + Future.successful(true) case One(address, mode) ⇒ // don't care about the protocol part - we are injected in the stack anyway! addressChaosTable.put(address.copy(protocol = "", system = ""), mode) - statusPromise.success(true) - case _ ⇒ wrappedTransport.managementCommand(cmd, statusPromise) + Future.successful(true) + case _ ⇒ wrappedTransport.managementCommand(cmd) } protected def interceptListen(listenAddress: Address, diff --git a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala index 200e038778..b0f11f911e 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport import TestTransport._ diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index bb74937563..76d4d77b04 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -1,20 +1,24 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport -import ThrottlerTransportAdapter._ import akka.actor._ +import akka.pattern.ask import akka.pattern.pipe import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.AkkaPduCodec.Associate import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener } import akka.remote.transport.ThrottledAssociation._ import akka.remote.transport.ThrottlerManager.Checkin -import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle +import akka.remote.transport.ThrottlerTransportAdapter._ import akka.remote.transport.Transport._ import akka.util.ByteString import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable.Queue +import scala.concurrent.Future import scala.concurrent.Promise import scala.math.min import scala.util.{ Success, Failure } @@ -54,7 +58,9 @@ object ThrottlerTransportAdapter { } } + object SetThrottle case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode) + case object SetThrottleAck sealed trait ThrottleMode { def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) @@ -107,11 +113,11 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA Props(new ThrottlerManager(wt)) } - override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match { - case s @ SetThrottle(_, _, _) ⇒ - manager ! s - statusPromise.success(true) - case _ ⇒ wrappedTransport.managementCommand(cmd, statusPromise) + override def managementCommand(cmd: Any): Future[Boolean] = cmd match { + case s: SetThrottle ⇒ + import ActorTransportAdapter.AskTimeout + manager ? s map { case SetThrottleAck ⇒ true } + case _ ⇒ wrappedTransport.managementCommand(cmd) } } @@ -150,13 +156,17 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor handleTable ::= nakedAddress(naked) -> wrappedHandle statusPromise.success(wrappedHandle) - case s @ SetThrottle(address, direction, mode) ⇒ + case SetThrottle(address, direction, mode) ⇒ val naked = nakedAddress(address) throttlingModes += naked -> (mode, direction) - handleTable.foreach { - case (addr, handle) ⇒ - if (addr == naked) setMode(handle, mode, direction) + val ok = Future.successful(SetThrottleAck) + val allAcks = handleTable.map { + case (`naked`, handle) ⇒ setMode(handle, mode, direction) + case _ ⇒ ok } + + Future.sequence(allAcks).map(_ ⇒ SetThrottleAck) pipeTo sender + case Checkin(origin, handle) ⇒ val naked: Address = nakedAddress(origin) handleTable ::= naked -> handle @@ -178,16 +188,21 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A } } - private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = { + private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Future[SetThrottleAck.type] = { throttlingModes.get(nakedAddress) match { case Some((mode, direction)) ⇒ setMode(handle, mode, direction) case None ⇒ setMode(handle, Unthrottled, Direction.Both) } } - private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = { - if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode - if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode) + private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Future[SetThrottleAck.type] = { + import ActorTransportAdapter.AskTimeout + if (direction.includes(Direction.Send)) + handle.outboundThrottleMode.set(mode) + if (direction.includes(Direction.Receive)) + (handle.throttlerActor ? mode).mapTo[SetThrottleAck.type] + else + Future.successful(SetThrottleAck) } private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = { @@ -274,7 +289,7 @@ private[transport] class ThrottledAssociation( stay() case Event(mode: ThrottleMode, ExposedHandle(exposedHandle)) ⇒ inboundThrottleMode = mode - if (inboundThrottleMode == Blackhole) { + try if (mode == Blackhole) { throttledMessages = Queue.empty[ByteString] exposedHandle.disassociate() stop() @@ -282,7 +297,7 @@ private[transport] class ThrottledAssociation( associationHandler notify InboundAssociation(exposedHandle) exposedHandle.readHandlerPromise.future pipeTo self goto(WaitUpstreamListener) - } + } finally sender ! SetThrottleAck } when(WaitUpstreamListener) { @@ -309,10 +324,11 @@ private[transport] class ThrottledAssociation( when(Throttling) { case Event(mode: ThrottleMode, _) ⇒ inboundThrottleMode = mode - if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString] + if (mode == Blackhole) throttledMessages = Queue.empty[ByteString] cancelTimer(DequeueTimerName) if (throttledMessages.nonEmpty) scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length)) + sender ! SetThrottleAck stay() case Event(InboundPayload(p), _) ⇒ forwardOrDelay(p) diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index b2cdbcc54e..82f361dfa0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport import scala.concurrent.{ Promise, Future } @@ -129,7 +132,7 @@ trait Transport { * @param cmd Command message to the transport * @return Future that succeeds when the command was handled or dropped */ - def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = { statusPromise.success(false) } + def managementCommand(cmd: Any): Future[Boolean] = { Future.successful(false) } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala index dec126cd6a..94d586908b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport.netty import akka.AkkaException diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 361337242a..48436b3569 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport.netty import akka.{ OnlyCauseStackTrace, ConfigurationException } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 515637dad0..c4c7e7c674 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport.netty import akka.actor.Address diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index c52e8d9bc9..0b20b25fb7 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ package akka.remote.transport.netty import akka.actor.Address