From 0705d47a88aa8c705e94bef7a0b7fe7a9533e8b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 23 Nov 2012 10:15:19 +0100 Subject: [PATCH] Streamlined configuration, transport adapters and FailureInjector - Transports no longer uses raw ActorRefs as listeners but proper interfaces. - Added managementCommand support to Transports - Added support for dynamically loadable transport adapters - Added throttler/failure injector transport adapter - added actor based adapter support - Changed configuration method of multiple transports - Fixed tests to work with the new remoting --- akka-remote/src/main/resources/reference.conf | 53 ++- .../src/main/scala/akka/remote/Endpoint.scala | 16 +- .../remote/PhiAccrualFailureDetector.scala | 3 +- .../akka/remote/RemoteActorRefProvider.scala | 25 +- .../scala/akka/remote/RemoteTransport.scala | 9 + .../src/main/scala/akka/remote/Remoting.scala | 77 +++- .../transport/AbstractTransportAdapter.scala | 151 +++++++ .../transport/AkkaProtocolTransport.scala | 160 +++---- .../FailureInjectorTransportAdapter.scala | 138 ++++++ .../akka/remote/transport/TestTransport.scala | 371 ++++++++------- .../transport/ThrottlerTransportAdapter.scala | 425 ++++++++++++++++++ .../akka/remote/transport/Transport.scala | 127 ++++-- .../transport/netty/NettyTransport.scala | 47 +- .../remote/transport/netty/TcpSupport.scala | 32 +- .../remote/transport/netty/UdpSupport.scala | 30 +- .../akka/remote/RemoteCommunicationSpec.scala | 3 +- .../scala/akka/remote/RemoteConfigSpec.scala | 5 +- .../akka/remote/RemoteDeathWatchSpec.scala | 11 +- .../akka/remote/RemoteDeployerSpec.scala | 2 +- .../scala/akka/remote/RemoteRouterSpec.scala | 52 +-- .../test/scala/akka/remote/RemotingSpec.scala | 123 +---- .../remote/Ticket1978CommunicationSpec.scala | 2 +- .../akka/remote/Ticket1978ConfigSpec.scala | 48 +- .../scala/akka/remote/UntrustedSpec.scala | 9 +- .../remote/transport/AkkaProtocolSpec.scala | 2 +- .../transport/AkkaProtocolStressTest.scala | 90 ++++ .../transport/GenericTransportSpec.scala | 167 +++++++ .../remote/transport/TestTransportSpec.scala | 26 +- .../remote/transport/ThrottleModeSpec.scala | 99 ++++ .../ThrottlerTransportAdapterSpec.scala | 94 ++++ 30 files changed, 1812 insertions(+), 585 deletions(-) create mode 100644 akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala create mode 100644 akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala rename akka-remote/src/{test => main}/scala/akka/remote/transport/TestTransport.scala (78%) create mode 100644 akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala create mode 100644 akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala create mode 100644 akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala create mode 100644 akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala create mode 100644 akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 69fa271b9a..9c41c32f2f 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -80,7 +80,7 @@ akka { wait-activity-enabled = on # FIXME document - backoff-interval = 1 s + backoff-interval = 0.01 s # FIXME document secure-cookie = "" @@ -105,13 +105,62 @@ akka { # FIXME document use-passive-connections = on + + adapters { + gremlin = "akka.remote.transport.FailureInjectorProvider" + trttl = "akka.remote.transport.ThrottlerProvider" + } + + enabled-transports = ["tcp"] + + transports.tcp { + transport-class = "akka.remote.transport.netty.NettyTransport" + applied-adapters = [] + + transport-protocol = tcp + port = 2552 + hostname = "localhost" #FIXME Empty string should default to localhost + enable-ssl = false + log-transport-events = true + connection-timeout = 120s + use-dispatcher-for-io = "" + write-buffer-high-water-mark = 0b + write-buffer-low-water-mark = 0b + send-buffer-size = 32000b + receive-buffer-size = 32000b + backlog = 4096 + + server-socket-worker-pool { + pool-size-min = 2 + pool-size-factor = 1.0 + pool-size-max = 8 + } + + client-socket-worker-pool { + pool-size-min = 2 + pool-size-factor = 1.0 + pool-size-max = 8 + } + } + + transports.udp = ${akka.remoting.transports.tcp} + transports.udp { + transport-protocol = udp + } + + transports.ssl = ${akka.remoting.transports.tcp} + transports.ssl = { + enable-ssl = true + } + } remote { # Which implementation of akka.remote.RemoteTransport to use # default is a TCP-based remote transport based on Netty - transport = "akka.remote.netty.NettyRemoteTransport" + transport = "akka.remote.Remoting" + # Enable untrusted mode for full security of server managed actors, prevents # system messages to be send by clients, e.g. messages like 'Create', diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 68c06e96ea..2fe6b7ed2b 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -46,13 +46,15 @@ class DefaultMessageDispatcher(private val system: ExtendedActorSystem, recipient match { case `remoteDaemon` ⇒ - if (LogReceive) log.debug("received daemon message {}", msgLog) - payload match { - case m @ (_: DaemonMsg | _: Terminated) ⇒ - try remoteDaemon ! m catch { - case NonFatal(e) ⇒ log.error(e, "exception while processing remote command {} from {}", m, sender) - } - case x ⇒ log.debug("remoteDaemon received illegal message {} from {}", x, sender) + if (UntrustedMode) log.debug("dropping daemon message in untrusted mode") else { + if (LogReceive) log.debug("received daemon message {}", msgLog) + payload match { + case m @ (_: DaemonMsg | _: Terminated) ⇒ + try remoteDaemon ! m catch { + case NonFatal(e) ⇒ log.error(e, "exception while processing remote command {} from {}", m, sender) + } + case x ⇒ log.debug("remoteDaemon received illegal message {} from {}", x, sender) + } } case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index f0252106c7..d51c2b90ba 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -134,7 +134,8 @@ class PhiAccrualFailureDetector( /** * Cumulative distribution function for N(mean, stdDeviation) normal distribution. - * This is an approximation defined in β Mathematics Handbook. + * This is an approximation defined in β Mathematics Handbook (Logistic approximation). + * Error is 0.00014 at +- 3.16 */ private[akka] def cumulativeDistributionFunction(x: Double, mean: Double, stdDeviation: Double): Double = { val y = (x - mean) / stdDeviation diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 8f1c440157..e2577f3804 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -164,9 +164,15 @@ class RemoteActorRefProvider( if (isSelfAddress(addr)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) } else { - val localAddress = transport.localAddressForRemote(addr) - val rpath = RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements - new RemoteActorRef(this, transport, localAddress, rpath, supervisor, Some(props), Some(d)) + try { + val localAddress = transport.localAddressForRemote(addr) + val rpath = RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements + new RemoteActorRef(this, transport, localAddress, rpath, supervisor, Some(props), Some(d)) + } catch { + case NonFatal(e) ⇒ + log.error(e, "Error while looking up address {}", addr) + new EmptyLocalActorRef(this, path, eventStream) + } } case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) @@ -174,10 +180,17 @@ class RemoteActorRefProvider( } } - def actorFor(path: ActorPath): InternalActorRef = + def actorFor(path: ActorPath): InternalActorRef = { if (isSelfAddress(path.address)) actorFor(rootGuardian, path.elements) - else new RemoteActorRef(this, transport, transport.localAddressForRemote(path.address), - path, Nobody, props = None, deploy = None) + else try { + new RemoteActorRef(this, transport, transport.localAddressForRemote(path.address), + path, Nobody, props = None, deploy = None) + } catch { + case NonFatal(e) ⇒ + log.error(e, "Error while looking up address {}", path.address) + new EmptyLocalActorRef(this, path, eventStream) + } + } def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case ActorPathExtractor(address, elems) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index dddb74cede..1e78351ac2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -12,6 +12,7 @@ import akka.serialization.Serialization import akka.remote.RemoteProtocol._ import akka.actor._ import scala.collection.immutable +import scala.concurrent.Future /** * Remote life-cycle events. @@ -220,6 +221,14 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re if (logRemoteLifeCycleEvents) log.log(message.logLevel, "{}", message) } + /** + * Sends a management command to the underlying transport stack. The call returns with a Future that indicates + * if the command was handled successfully or dropped. + * @param cmd Command message to send to the transports. + * @return A Future that indicates when the message was successfully handled or dropped. + */ + def managementCommand(cmd: Any): Future[Boolean] = { Future.successful(false) } + /** * A Logger that can be used to log issues that may occur */ diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index e5b050b223..7270e2d1bf 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -1,14 +1,15 @@ package akka.remote +import scala.language.postfixOps import akka.actor.SupervisorStrategy._ import akka.actor._ import akka.event.{ Logging, LoggingAdapter } import akka.pattern.gracefulStop -import akka.remote.EndpointManager.{ StartupFinished, Listen, Send } -import akka.remote.transport.Transport.InboundAssociation +import akka.remote.EndpointManager.{ StartupFinished, ManagementCommand, Listen, Send } +import akka.remote.transport.Transport.{ AssociationEventListener, InboundAssociation } import akka.remote.transport._ import akka.util.Timeout -import com.typesafe.config.Config +import com.typesafe.config.{ ConfigFactory, Config } import scala.collection.immutable.{ Seq, HashMap } import scala.concurrent.duration._ import scala.concurrent.{ Promise, Await, Future } @@ -18,8 +19,9 @@ import java.util.concurrent.TimeoutException import scala.util.{ Failure, Success } import scala.collection.immutable import akka.japi.Util.immutableSeq +import akka.remote.Remoting.RegisterTransportActor -class RemotingSettings(config: Config) { +class RemotingSettings(val config: Config) { import config._ import scala.collection.JavaConverters._ @@ -30,7 +32,7 @@ class RemotingSettings(config: Config) { val StartupTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS) - val RetryGateClosedFor: Long = getMilliseconds("akka.remoting.retry-gate-closed-for") + val RetryGateClosedFor: Long = getNanoseconds("akka.remoting.retry-gate-closed-for") val UsePassiveConnections: Boolean = getBoolean("akka.remoting.use-passive-connections") @@ -41,10 +43,21 @@ class RemotingSettings(config: Config) { val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS) - val Transports: immutable.Seq[(String, Config)] = - immutableSeq(config.getConfigList("akka.remoting.transports")).map { - conf ⇒ (conf.getString("transport-class"), conf.getConfig("settings")) - } + val Transports: Seq[(String, Seq[String], Config)] = transportNames.map { name ⇒ + val transportConfig = transportConfigFor(name) + (transportConfig.getString("transport-class"), + immutableSeq(transportConfig.getStringList("applied-adapters")), + transportConfig) + } + + val Adapters: Map[String, String] = configToMap(getConfig("akka.remoting.adapters")) + + private def transportNames: Seq[String] = immutableSeq(getStringList("akka.remoting.enabled-transports")) + + private def transportConfigFor(transportName: String): Config = getConfig("akka.remoting.transports." + transportName) + + private def configToMap(cfg: Config): Map[String, String] = + cfg.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } } private[remote] object Remoting { @@ -60,7 +73,7 @@ private[remote] object Remoting { responsibleTransports.size match { case 0 ⇒ throw new RemoteTransportException( - s"No transport is responsible for address: [${remote}] although protocol [${remote.protocol}] is available." + + s"No transport is responsible for address: ${remote} although protocol ${remote.protocol} is available." + " Make sure at least one transport is configured to be responsible for the address.", null) @@ -74,10 +87,13 @@ private[remote] object Remoting { "so that only one transport is responsible for the address.", null) } - case None ⇒ throw new RemoteTransportException(s"No transport is loaded for protocol: ${remote.protocol}", null) + case None ⇒ throw new RemoteTransportException( + s"No transport is loaded for protocol: ${remote.protocol}, available protocols: ${transportMapping.keys.mkString}", null) } } + case class RegisterTransportActor(props: Props, name: String) + } private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { @@ -90,6 +106,16 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc private val settings = new RemotingSettings(provider.remoteSettings.config) + val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { + override def supervisorStrategy = OneForOneStrategy() { + case NonFatal(e) ⇒ Restart + } + + def receive = { + case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props, name) + } + }), "transports") + override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote) val log: LoggingAdapter = Logging(system.eventStream, "Remoting") @@ -164,6 +190,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc endpointManager.tell(Send(message, senderOption, recipient), sender = Actor.noSender) } + override def managementCommand(cmd: Any): Future[Boolean] = { + val statusPromise = Promise[Boolean]() + endpointManager.tell(ManagementCommand(cmd, statusPromise), sender = Actor.noSender) + statusPromise.future + } + // Not used anywhere only to keep compatibility with RemoteTransport interface protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode @@ -182,6 +214,8 @@ private[remote] object EndpointManager { override def toString = s"Remote message $senderOption -> $recipient" } + case class ManagementCommand(cmd: Any, statusPromise: Promise[Boolean]) extends RemotingCommand + sealed trait EndpointPolicy case class Pass(endpoint: ActorRef) extends EndpointPolicy case class Gated(timeOfFailure: Long) extends EndpointPolicy @@ -299,10 +333,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends context.stop(self) } - case StartupFinished ⇒ context.become(accepting) + case ManagementCommand(_, statusPromise) ⇒ statusPromise.success(false) + + case StartupFinished ⇒ context.become(accepting) } val accepting: Receive = { + case ManagementCommand(cmd, statusPromise) ⇒ + transportMapping.values foreach { _.managementCommand(cmd, statusPromise) } + case s @ Send(message, senderOption, recipientRef) ⇒ val recipientAddress = recipientRef.path.address @@ -337,11 +376,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } private def initializeTransports(addressesPromise: Promise[Set[(Transport, Address)]]): Unit = { - val transports = for ((fqn, config) ← settings.Transports) yield { + val transports = for ((fqn, adapters, config) ← settings.Transports) yield { val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config) - val wrappedTransport = extendedSystem.dynamicAccess + val driver = extendedSystem.dynamicAccess .createInstanceFor[Transport](fqn, args).recover({ case exception ⇒ throw new IllegalArgumentException( @@ -351,11 +390,17 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends }).get + val wrappedTransport = + adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider(_) }.foldLeft(driver) { + (t: Transport, provider: TransportAdapterProvider) ⇒ + provider(t, context.system.asInstanceOf[ExtendedActorSystem]) + } + new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec) } - val listens: Future[Seq[(Transport, (Address, Promise[ActorRef]))]] = Future.sequence( + val listens: Future[Seq[(Transport, (Address, Promise[AssociationEventListener]))]] = Future.sequence( transports.map { transport ⇒ transport.listen map (transport -> _) }) listens.onComplete { @@ -394,7 +439,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends .withDispatcher("akka.remoting.writer-dispatcher"), "endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next()) - context.watch(endpoint) // TODO: see what to do with this + context.watch(endpoint) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala new file mode 100644 index 0000000000..17105a1df5 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -0,0 +1,151 @@ +package akka.remote.transport + +import scala.language.postfixOps +import akka.actor._ +import akka.pattern.ask +import akka.remote.transport.Transport._ +import akka.remote.{ RemotingSettings, RemoteActorRefProvider } +import scala.collection.immutable +import scala.concurrent.{ Await, ExecutionContext, Promise, Future } +import scala.util.Success +import scala.util.Failure +import akka.remote.Remoting.RegisterTransportActor +import akka.util.Timeout +import scala.concurrent.duration._ + +trait TransportAdapterProvider extends ((Transport, ExtendedActorSystem) ⇒ Transport) + +class TransportAdapters(system: ExtendedActorSystem) extends Extension { + val settings = new RemotingSettings(system.provider.asInstanceOf[RemoteActorRefProvider].remoteSettings.config) + + private val adaptersTable: Map[String, TransportAdapterProvider] = for ((name, fqn) ← settings.Adapters) yield { + name -> system.dynamicAccess.createInstanceFor[TransportAdapterProvider](fqn, immutable.Seq.empty).recover({ + case exception ⇒ throw new IllegalArgumentException("Cannot instantiate transport adapter" + fqn, exception) + }).get + } + + def getAdapterProvider(name: String): TransportAdapterProvider = adaptersTable.get(name) match { + case Some(provider) ⇒ provider + case None ⇒ throw new IllegalArgumentException("There is no registered transport adapter provider with name: " + name) + } +} + +object TransportAdaptersExtension extends ExtensionId[TransportAdapters] with ExtensionIdProvider { + override def get(system: ActorSystem): TransportAdapters = super.get(system) + override def lookup = TransportAdaptersExtension + override def createExtension(system: ExtendedActorSystem): TransportAdapters = + new TransportAdapters(system) +} + +trait SchemeAugmenter { + protected def addedSchemeIdentifier: String + + protected def augmentScheme(originalScheme: String): String = s"$originalScheme.$addedSchemeIdentifier" + + protected def augmentScheme(address: Address): Address = address.copy(protocol = augmentScheme(address.protocol)) + + protected def removeScheme(scheme: String): String = if (scheme.endsWith(s".$addedSchemeIdentifier")) + scheme.take(scheme.length - addedSchemeIdentifier.length - 1) + else scheme + + protected def removeScheme(address: Address): Address = address.copy(protocol = removeScheme(address.protocol)) +} + +/** + * An adapter that wraps a transport and provides interception + */ +abstract class AbstractTransportAdapter(protected val wrappedTransport: Transport, implicit val ec: ExecutionContext) + extends Transport with SchemeAugmenter { + + protected def maximumOverhead: Int + + protected def interceptListen(listenAddress: Address, + listenerFuture: Future[AssociationEventListener]): AssociationEventListener + + protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit + + override def schemeIdentifier: String = augmentScheme(wrappedTransport.schemeIdentifier) + + override def isResponsibleFor(address: Address): Boolean = wrappedTransport.isResponsibleFor(address) + + override def maximumPayloadBytes: Int = wrappedTransport.maximumPayloadBytes - maximumOverhead + + override def listen: Future[(Address, Promise[AssociationEventListener])] = { + val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise() + val upstreamListenerPromise: Promise[AssociationEventListener] = Promise() + wrappedTransport.listen.onComplete { + case Success((listenAddress, listenerPromise)) ⇒ + // Register to downstream + listenerPromise.success(interceptListen(listenAddress, upstreamListenerPromise.future)) + // Notify upstream + listenPromise.success((augmentScheme(listenAddress), upstreamListenerPromise)) + case Failure(reason) ⇒ listenPromise.failure(reason) + } + listenPromise.future + } + + override def associate(remoteAddress: Address): Future[Status] = { + // Prepare a future, and pass its promise to the manager + val statusPromise: Promise[Status] = Promise() + + interceptAssociate(removeScheme(remoteAddress), statusPromise) + + statusPromise.future + } + + override def shutdown(): Unit = wrappedTransport.shutdown() + +} + +abstract class AbstractTransportAdapterHandle(val originalLocalAddress: Address, + val originalRemoteAddress: Address, + val wrappedHandle: AssociationHandle, + val addedSchemeIdentifier: String) extends AssociationHandle + with SchemeAugmenter { + + def this(wrappedHandle: AssociationHandle, addedSchemeIdentifier: String) = + this(wrappedHandle.localAddress, + wrappedHandle.remoteAddress, + wrappedHandle, + addedSchemeIdentifier) + + override val localAddress = augmentScheme(originalLocalAddress) + override val remoteAddress = augmentScheme(originalRemoteAddress) + +} + +object ActorTransportAdapter { + sealed trait TransportOperation + + case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation + case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[Status]) extends TransportOperation + case class ListenUnderlying(listenAddress: Address, + upstreamListener: Future[AssociationEventListener]) extends TransportOperation + case object DisassociateUnderlying extends TransportOperation +} + +abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem) + extends AbstractTransportAdapter(wrappedTransport, system.dispatcher) { + import ActorTransportAdapter._ + private implicit val timeout = new Timeout(3 seconds) + + protected def managerName: String + protected def managerProps: Props + // The blocking call below is only called during the startup sequence. + protected val manager = Await.result(registerManager(), 3 seconds) + + private def registerManager(): Future[ActorRef] = + (system.actorFor("/system/transports") ? RegisterTransportActor(managerProps, managerName)).mapTo[ActorRef] + + protected def interceptListen(listenAddress: Address, + listenerPromise: Future[AssociationEventListener]): AssociationEventListener = { + manager ! ListenUnderlying(listenAddress, listenerPromise) + manager + } + + override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit = + manager ! AssociateUnderlying(remoteAddress, statusPromise) + + override def shutdown(): Unit = manager ! PoisonPill +} + diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index c274dbffe4..9d1620cf43 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -18,6 +18,7 @@ import scala.util.control.NonFatal import scala.util.{ Success, Failure } import java.net.URLEncoder import scala.collection.immutable.Queue +import akka.remote.transport.ActorTransportAdapter._ class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) @@ -50,21 +51,6 @@ private[remote] object AkkaProtocolTransport { val AkkaOverhead: Int = 0 //Don't know yet val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0) - sealed trait TransportOperation - case class HandlerRegistered(handler: ActorRef) extends TransportOperation - case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[Status]) extends TransportOperation - case class ListenUnderlying(listenPromise: Promise[(Address, Promise[ActorRef])]) extends TransportOperation - case object DisassociateUnderlying extends TransportOperation - - def augmentScheme(originalScheme: String): String = s"$originalScheme.$AkkaScheme" - - def augmentScheme(address: Address): Address = address.copy(protocol = augmentScheme(address.protocol)) - - def removeScheme(scheme: String): String = if (scheme.endsWith(s".$AkkaScheme")) - scheme.take(scheme.length - AkkaScheme.length - 1) - else scheme - - def removeScheme(address: Address): Address = address.copy(protocol = removeScheme(address.protocol)) } /** @@ -92,42 +78,19 @@ private[remote] object AkkaProtocolTransport { * the codec that will be used to encode/decode Akka PDUs */ private[remote] class AkkaProtocolTransport( - private val wrappedTransport: Transport, + wrappedTransport: Transport, private val system: ActorSystem, private val settings: AkkaProtocolSettings, - private val codec: AkkaPduCodec) extends Transport { + private val codec: AkkaPduCodec) extends ActorTransportAdapter(wrappedTransport, system) { - override val schemeIdentifier: String = augmentScheme(wrappedTransport.schemeIdentifier) + override val addedSchemeIdentifier: String = AkkaScheme - override def isResponsibleFor(address: Address): Boolean = wrappedTransport.isResponsibleFor(removeScheme(address)) - - //TODO: make this the child of someone more appropriate - private val manager = system.asInstanceOf[ActorSystemImpl].systemActorOf( - Props(new AkkaProtocolManager(wrappedTransport, settings)), - s"akkaprotocolmanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}") - - override val maximumPayloadBytes: Int = wrappedTransport.maximumPayloadBytes - AkkaProtocolTransport.AkkaOverhead - - override def listen: Future[(Address, Promise[ActorRef])] = { - // Prepare a future, and pass its promise to the manager - val listenPromise: Promise[(Address, Promise[ActorRef])] = Promise() - - manager ! ListenUnderlying(listenPromise) - - listenPromise.future - } - - override def associate(remoteAddress: akka.actor.Address): Future[Status] = { - // Prepare a future, and pass its promise to the manager - val statusPromise: Promise[Status] = Promise() - - manager ! AssociateUnderlying(remoteAddress, statusPromise) - - statusPromise.future - } - - override def shutdown(): Unit = manager ! PoisonPill + override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = + wrappedTransport.managementCommand(cmd, statusPromise) + override val maximumOverhead: Int = AkkaProtocolTransport.AkkaOverhead + protected def managerName = s"akkaprotocolmanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}" + protected def managerProps = Props(new AkkaProtocolManager(wrappedTransport, settings)) } private[transport] class AkkaProtocolManager( @@ -145,32 +108,17 @@ private[transport] class AkkaProtocolManager( private val nextId = Iterator from 0 - private val associationHandlerPromise: Promise[ActorRef] = Promise() - associationHandlerPromise.future.map { HandlerRegistered(_) } pipeTo self + var localAddress: Address = _ - @volatile var localAddress: Address = _ - - private var associationHandler: ActorRef = _ + private var associationHandler: AssociationEventListener = _ def receive: Receive = { - case ListenUnderlying(listenPromise) ⇒ - val listenFuture = wrappedTransport.listen + case ListenUnderlying(listenAddress, upstreamListenerFuture) ⇒ + localAddress = listenAddress + upstreamListenerFuture.future.map { ListenerRegistered(_) } pipeTo self - // - Receive the address and promise from original transport - // - then register ourselves as listeners - // - then complete the exposed promise with the modified contents - listenFuture.onComplete { - case Success((address, wrappedTransportHandlerPromise)) ⇒ - // Register ourselves as the handler for the wrapped transport's listen call - wrappedTransportHandlerPromise.success(self) - localAddress = address - // Pipe the result to the original caller - listenPromise.success((augmentScheme(address), associationHandlerPromise)) - case Failure(reason) ⇒ listenPromise.failure(reason) - } - - case HandlerRegistered(handler) ⇒ - associationHandler = handler + case ListenerRegistered(listener) ⇒ + associationHandler = listener context.become(ready) // Block inbound associations until handler is registered @@ -215,13 +163,13 @@ private[transport] class AkkaProtocolManager( } private[transport] class AkkaProtocolHandle( - val localAddress: Address, - val remoteAddress: Address, - val readHandlerPromise: Promise[ActorRef], - private val wrappedHandle: AssociationHandle, + _localAddress: Address, + _remoteAddress: Address, + val readHandlerPromise: Promise[HandleEventListener], + _wrappedHandle: AssociationHandle, private val stateActor: ActorRef, private val codec: AkkaPduCodec) - extends AssociationHandle { + extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, AkkaScheme) { override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload)) @@ -256,6 +204,8 @@ private[transport] object ProtocolStateActor { case object HeartbeatTimer + case class HandleListenerRegistered(listener: HandleEventListener) + sealed trait ProtocolStateData trait InitialProtocolStateData extends ProtocolStateData @@ -268,14 +218,15 @@ private[transport] object ProtocolStateActor { extends ProtocolStateData // The underlying transport is associated, but the handshake of the akka protocol is not yet finished - case class InboundUnassociated(associationHandler: ActorRef, wrappedHandle: AssociationHandle) + case class InboundUnassociated(associationListener: AssociationEventListener, wrappedHandle: AssociationHandle) extends InitialProtocolStateData // Both transports are associated, but the handler for the handle has not yet been provided - case class AssociatedWaitHandler(handlerFuture: Future[ActorRef], wrappedHandle: AssociationHandle, queue: Queue[ByteString]) + case class AssociatedWaitHandler(handleListener: Future[HandleEventListener], wrappedHandle: AssociationHandle, + queue: Queue[ByteString]) extends ProtocolStateData - case class HandlerReady(handler: ActorRef, wrappedHandle: AssociationHandle) + case class ListenerReady(listener: HandleEventListener, wrappedHandle: AssociationHandle) extends ProtocolStateData case object TimeoutReason @@ -305,16 +256,16 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat // Inbound case def this(localAddress: Address, wrappedHandle: AssociationHandle, - associationHandler: ActorRef, + associationListener: AssociationEventListener, settings: AkkaProtocolSettings, codec: AkkaPduCodec, failureDetector: FailureDetector) = { - this(InboundUnassociated(associationHandler, wrappedHandle), localAddress, settings, codec, failureDetector) + this(InboundUnassociated(associationListener, wrappedHandle), localAddress, settings, codec, failureDetector) } initialData match { case d: OutboundUnassociated ⇒ - d.transport.associate(removeScheme(d.remoteAddress)) pipeTo self + d.transport.associate(d.remoteAddress) pipeTo self startWith(Closed, d) case d: InboundUnassociated ⇒ @@ -421,8 +372,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue) ⇒ // Queue message until handler is registered stay() using AssociatedWaitHandler(handlerFuture, wrappedHandle, queue :+ payload) - case HandlerReady(handler, _) ⇒ - handler ! InboundPayload(payload) + case ListenerReady(listener, _) ⇒ + listener notify InboundPayload(payload) stay() } @@ -430,15 +381,19 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } case Event(HeartbeatTimer, AssociatedWaitHandler(_, wrappedHandle, _)) ⇒ handleTimers(wrappedHandle) - case Event(HeartbeatTimer, HandlerReady(_, wrappedHandle)) ⇒ handleTimers(wrappedHandle) + case Event(HeartbeatTimer, ListenerReady(_, wrappedHandle)) ⇒ handleTimers(wrappedHandle) - case Event(DisassociateUnderlying, HandlerReady(handler, wrappedHandle)) ⇒ - sendDisassociate(wrappedHandle) + case Event(DisassociateUnderlying, _) ⇒ + val handle = stateData match { + case ListenerReady(_, wrappedHandle) ⇒ wrappedHandle + case AssociatedWaitHandler(_, wrappedHandle, _) ⇒ wrappedHandle + } + sendDisassociate(handle) stop() - case Event(HandlerRegistered(ref), AssociatedWaitHandler(_, wrappedHandle, queue)) ⇒ - queue.foreach { ref ! InboundPayload(_) } - stay() using HandlerReady(ref, wrappedHandle) + case Event(HandleListenerRegistered(listener), AssociatedWaitHandler(_, wrappedHandle, queue)) ⇒ + queue.foreach { listener notify InboundPayload(_) } + stay() using ListenerReady(listener, wrappedHandle) } private def initTimers(): Unit = { @@ -477,25 +432,26 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat // Invalidate exposed but still unfinished promise. The underlying association disappeared, so after // registration immediately signal a disassociate handlerFuture.onSuccess { - case handler: ActorRef ⇒ handler ! Disassociated + case listener: HandleEventListener ⇒ listener notify Disassociated } - case StopEvent(_, _, HandlerReady(handler, wrappedHandle)) ⇒ - handler ! Disassociated + case StopEvent(_, _, ListenerReady(handler, wrappedHandle)) ⇒ + handler notify Disassociated wrappedHandle.disassociate() case StopEvent(_, _, InboundUnassociated(_, wrappedHandle)) ⇒ wrappedHandle.disassociate() } - private def notifyOutboundHandler(wrappedHandle: AssociationHandle, statusPromise: Promise[Status]): Future[ActorRef] = { - val readHandlerPromise: Promise[ActorRef] = Promise() - readHandlerPromise.future.map { HandlerRegistered(_) } pipeTo self + private def notifyOutboundHandler(wrappedHandle: AssociationHandle, + statusPromise: Promise[Status]): Future[HandleEventListener] = { + val readHandlerPromise: Promise[HandleEventListener] = Promise() + readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self val exposedHandle = new AkkaProtocolHandle( - augmentScheme(localAddress), - augmentScheme(wrappedHandle.remoteAddress), + localAddress, + wrappedHandle.remoteAddress, readHandlerPromise, wrappedHandle, self, @@ -505,20 +461,22 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat readHandlerPromise.future } - private def notifyInboundHandler(wrappedHandle: AssociationHandle, originAddress: Address, associationHandler: ActorRef): Future[ActorRef] = { - val readHandlerPromise: Promise[ActorRef] = Promise() - readHandlerPromise.future.map { HandlerRegistered(_) } pipeTo self + private def notifyInboundHandler(wrappedHandle: AssociationHandle, + originAddress: Address, + associationListener: AssociationEventListener): Future[HandleEventListener] = { + val readHandlerPromise: Promise[HandleEventListener] = Promise() + readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self val exposedHandle = new AkkaProtocolHandle( - augmentScheme(localAddress), - augmentScheme(originAddress), + localAddress, + originAddress, readHandlerPromise, wrappedHandle, self, codec) - associationHandler ! InboundAssociation(exposedHandle) + associationListener notify InboundAssociation(exposedHandle) readHandlerPromise.future } diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala new file mode 100644 index 0000000000..d0cb9b3db0 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -0,0 +1,138 @@ +package akka.remote.transport + +import FailureInjectorTransportAdapter._ +import akka.AkkaException +import akka.actor.{ Address, ExtendedActorSystem } +import akka.event.Logging +import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener } +import akka.remote.transport.Transport._ +import akka.util.ByteString +import java.util.concurrent.ConcurrentHashMap +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.{ Future, Promise } + +case class FailureInjectorException(msg: String) extends AkkaException(msg) + +class FailureInjectorProvider extends TransportAdapterProvider { + + def apply(wrappedTransport: Transport, system: ExtendedActorSystem): Transport = + new FailureInjectorTransportAdapter(wrappedTransport, system) + +} + +private[remote] object FailureInjectorTransportAdapter { + val FailureInjectorSchemeIdentifier = "gremlin" + + trait FailureInjectorCommand + case class All(mode: GremlinMode) + case class One(remoteAddress: Address, mode: GremlinMode) + + sealed trait GremlinMode + case object PassThru extends GremlinMode + case class Drop(outboundDropP: Double, inboundDropP: Double) extends GremlinMode +} + +private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem) + extends AbstractTransportAdapter(wrappedTransport, extendedSystem.dispatcher) with AssociationEventListener { + + import extendedSystem.dispatcher + + private val rng = ThreadLocalRandom.current() + private val log = Logging(extendedSystem, "FailureInjector (gremlin)") + + @volatile private var upstreamListener: Option[AssociationEventListener] = None + private[transport] val addressChaosTable = new ConcurrentHashMap[Address, GremlinMode]() + @volatile private var allMode: GremlinMode = PassThru + + override val addedSchemeIdentifier = FailureInjectorSchemeIdentifier + protected def maximumOverhead = 0 + + override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match { + case All(mode) ⇒ + allMode = mode + statusPromise.success(true) + case One(address, mode) ⇒ + // don't care about the protocol part - we are injected in the stack anyway! + addressChaosTable.put(address.copy(protocol = "", system = ""), mode) + statusPromise.success(true) + case _ ⇒ wrappedTransport.managementCommand(cmd, statusPromise) + } + + protected def interceptListen(listenAddress: Address, + listenerFuture: Future[AssociationEventListener]): AssociationEventListener = { + log.warning("FailureInjectorTransport is active on this system. Gremlins might munch your packets.") + listenerFuture.onSuccess { + case listener: AssociationEventListener ⇒ upstreamListener = Some(listener) + } + this + } + + protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit = { + // Association is simulated to be failed if there was either an inbound or outbound message drop + if (shouldDropInbound(remoteAddress) || shouldDropOutbound(remoteAddress)) + statusPromise.success(Fail(new FailureInjectorException("Simulated failure of association to " + remoteAddress))) + else + statusPromise.completeWith(wrappedTransport.associate(remoteAddress).map { + _ match { + case Ready(handle) ⇒ + addressChaosTable.putIfAbsent(handle.remoteAddress.copy(protocol = "", system = ""), PassThru) + Ready(new FailureInjectorHandle(handle, this)) + case s: Status ⇒ s + } + }) + } + + def notify(ev: AssociationEvent): Unit = ev match { + case InboundAssociation(handle) if shouldDropInbound(handle.remoteAddress) ⇒ //Ignore + case _ ⇒ upstreamListener match { + case Some(listener) ⇒ listener notify interceptInboundAssociation(ev) + case None ⇒ + } + } + + def interceptInboundAssociation(ev: AssociationEvent): AssociationEvent = ev match { + case InboundAssociation(handle) ⇒ InboundAssociation(FailureInjectorHandle(handle, this)) + case _ ⇒ ev + } + + def shouldDropInbound(remoteAddress: Address): Boolean = chaosMode(remoteAddress) match { + case PassThru ⇒ false + case Drop(_, inboundDropP) ⇒ rng.nextDouble() <= inboundDropP + } + + def shouldDropOutbound(remoteAddress: Address): Boolean = chaosMode(remoteAddress) match { + case PassThru ⇒ false + case Drop(outboundDropP, _) ⇒ rng.nextDouble() <= outboundDropP + } + + def chaosMode(remoteAddress: Address): GremlinMode = { + val mode = addressChaosTable.get(remoteAddress.copy(protocol = "", system = "")) + if (mode eq null) PassThru else mode + } +} + +private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHandle, + private val gremlinAdapter: FailureInjectorTransportAdapter) + extends AbstractTransportAdapterHandle(_wrappedHandle, FailureInjectorSchemeIdentifier) + with HandleEventListener { + import gremlinAdapter.extendedSystem.dispatcher + + @volatile private var upstreamListener: HandleEventListener = null + + override val readHandlerPromise: Promise[HandleEventListener] = Promise() + readHandlerPromise.future.onSuccess { + case listener: HandleEventListener ⇒ + upstreamListener = listener + wrappedHandle.readHandlerPromise.success(this) + } + + override def write(payload: ByteString): Boolean = if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress)) + wrappedHandle.write(payload) + else true + + override def disassociate(): Unit = wrappedHandle.disassociate() + + override def notify(ev: HandleEvent): Unit = if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress)) + upstreamListener notify ev + +} diff --git a/akka-remote/src/test/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala similarity index 78% rename from akka-remote/src/test/scala/akka/remote/transport/TestTransport.scala rename to akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala index d49b40c444..76fb8ba0ed 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/TestTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala @@ -10,9 +10,159 @@ import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap } import scala.concurrent.duration._ import scala.concurrent.{ Await, Future, Promise } -// Default EC is used, but this is just a test utility -- please forgive... import scala.concurrent.ExecutionContext.Implicits.global +/** + * Transport implementation to be used for testing. + * + * The TestTransport is basically a shared memory between actor systems. The TestTransport could be programmed to + * emulate different failure modes of a Transport implementation. TestTransport keeps a log of the activities it was + * requested to do. This class is not optimized for performace and MUST not be used as an in-memory transport in + * production systems. + */ +class TestTransport( + val localAddress: Address, + final val registry: AssociationRegistry, + val maximumPayloadBytes: Int = 32000, + val schemeIdentifier: String = "test") extends Transport { + + def this(system: ExtendedActorSystem, conf: Config) = { + this( + AddressFromURIString(conf.getString("local-address")), + AssociationRegistry.get(conf.getString("registry-key")), + conf.getBytes("maximum-payload-bytes").toInt, + conf.getString("scheme-identifier")) + } + + import akka.remote.transport.TestTransport._ + + override def isResponsibleFor(address: Address): Boolean = true + + private val associationListenerPromise = Promise[AssociationEventListener]() + + private def defaultListen: Future[(Address, Promise[AssociationEventListener])] = { + associationListenerPromise.future.onSuccess { + case listener: AssociationEventListener ⇒ registry.registerTransport(this, listener) + } + Promise.successful((localAddress, associationListenerPromise)).future + } + + private def defaultAssociate(remoteAddress: Address): Future[Status] = { + registry.transportFor(remoteAddress) match { + + case Some((remoteTransport, listener)) ⇒ + val (localHandle, remoteHandle) = createHandlePair(remoteTransport, remoteAddress) + + val bothSides: Future[(HandleEventListener, HandleEventListener)] = for ( + listener1 ← localHandle.readHandlerPromise.future; + listener2 ← remoteHandle.readHandlerPromise.future + ) yield (listener1, listener2) + + registry.registerListenerPair(localHandle.key, bothSides) + listener notify InboundAssociation(remoteHandle) + + Promise.successful(Ready(localHandle)).future + + case None ⇒ + Promise.successful(Fail(new IllegalArgumentException(s"No registered transport: $remoteAddress"))).future + } + } + + private def createHandlePair(remoteTransport: TestTransport, remoteAddress: Address): (TestAssociationHandle, TestAssociationHandle) = { + val localHandle = new TestAssociationHandle(localAddress, remoteAddress, this, inbound = false) + val remoteHandle = new TestAssociationHandle(remoteAddress, localAddress, remoteTransport, inbound = true) + + (localHandle, remoteHandle) + } + + private def defaultShutdown: Future[Unit] = Promise.successful(()).future + + /** + * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method. + */ + val listenBehavior = new SwitchableLoggedBehavior[Unit, (Address, Promise[AssociationEventListener])]( + (_) ⇒ defaultListen, + (_) ⇒ registry.logActivity(ListenAttempt(localAddress))) + + /** + * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the associate() method. + */ + val associateBehavior = new SwitchableLoggedBehavior[Address, Status]( + defaultAssociate _, + (remoteAddress) ⇒ registry.logActivity(AssociateAttempt(localAddress, remoteAddress))) + + /** + * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the shutdown() method. + */ + val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Unit]( + (_) ⇒ defaultShutdown, + (_) ⇒ registry.logActivity(ShutdownAttempt(localAddress))) + + override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior() + override def associate(remoteAddress: Address): Future[Status] = associateBehavior(remoteAddress) + override def shutdown(): Unit = shutdownBehavior() + + private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = { + registry.getRemoteReadHandlerFor(params._1) match { + case Some(futureActor) ⇒ + val writePromise = Promise[Boolean]() + futureActor.onSuccess { + case listener ⇒ listener notify InboundPayload(params._2); writePromise.success(true) + } + writePromise.future + case None ⇒ + Promise.failed(new IllegalStateException("No association present")).future + } + } + + private def defaultDisassociate(handle: TestAssociationHandle): Future[Unit] = { + registry.deregisterAssociation(handle.key).foreach { + case f: Future[(HandleEventListener, HandleEventListener)] ⇒ f.onSuccess { + case (listener1, listener2) ⇒ + (if (handle.inbound) listener1 else listener2) notify Disassociated + } + + } + Promise.successful(()).future + } + + /** + * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the write() method on handles. All + * handle calls pass through this call. Please note, that write operations return a Boolean synchronously, so + * altering the behavior via pushDelayed will turn write to a blocking operation -- use of pushDelayed therefore + * is not recommended. + */ + val writeBehavior = new SwitchableLoggedBehavior[(TestAssociationHandle, ByteString), Boolean]( + defaultBehavior = { + defaultWrite _ + }, + logCallback = { + case (handle, payload) ⇒ + registry.logActivity(WriteAttempt(handle.localAddress, handle.remoteAddress, payload)) + }) + + /** + * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the disassociate() method on handles. All + * handle calls pass through this call. + */ + val disassociateBehavior = new SwitchableLoggedBehavior[TestAssociationHandle, Unit]( + defaultBehavior = { + defaultDisassociate _ + }, + logCallback = { + (handle) ⇒ + registry.logActivity(DisassociateAttempt(handle.localAddress, handle.remoteAddress)) + }) + + private[akka] def write(handle: TestAssociationHandle, payload: ByteString): Boolean = + Await.result(writeBehavior((handle, payload)), 3 seconds) + + private[akka] def disassociate(handle: TestAssociationHandle): Unit = disassociateBehavior(handle) + + override def toString: String = s"TestTransport($localAddress)" + +} + object TestTransport { type Behavior[A, B] = (A) ⇒ Future[B] @@ -133,8 +283,8 @@ object TestTransport { class AssociationRegistry { private val activityLog = new CopyOnWriteArrayList[Activity]() - private val transportTable = new ConcurrentHashMap[Address, (TestTransport, ActorRef)]() - private val handlersTable = new ConcurrentHashMap[(Address, Address), Future[(ActorRef, ActorRef)]]() + private val transportTable = new ConcurrentHashMap[Address, (TestTransport, AssociationEventListener)]() + private val listenersTable = new ConcurrentHashMap[(Address, Address), Future[(HandleEventListener, HandleEventListener)]]() /** * Logs a transport activity. @@ -167,15 +317,15 @@ object TestTransport { } /** - * Records a mapping between an address and the corresponding (transport, actor) pair. + * Records a mapping between an address and the corresponding (transport, associationEventListener) pair. * * @param transport * The transport that is to be registered. The address of this transport will be used as key. - * @param responsibleActor - * The actor that will handle the events for the given transport. + * @param associationEventListener + * The listener that will handle the events for the given transport. */ - def registerTransport(transport: TestTransport, responsibleActor: ActorRef): Unit = { - transportTable.put(transport.localAddress, (transport, responsibleActor)) + def registerTransport(transport: TestTransport, associationEventListener: AssociationEventListener): Unit = { + transportTable.put(transport.localAddress, (transport, associationEventListener)) } /** @@ -187,23 +337,23 @@ object TestTransport { * @return * True if all transports are successfully registered. */ - def transportsReady(transports: TestTransport*): Boolean = { - transports forall { - t ⇒ transportTable.containsKey(t.localAddress) + def transportsReady(addresses: Address*): Boolean = { + addresses forall { + transportTable.containsKey(_) } } /** - * Registers a Future of two actors corresponding to the two endpoints of an association. + * Registers a Future of two handle event listeners corresponding to the two endpoints of an association. * * @param key * Ordered pair of addresses representing an association. First element must be the address of the initiator. - * @param readHandlers - * The future containing the actors that will be responsible for handling the events of the two endpoints of the + * @param listeners + * The future containing the listeners that will be responsible for handling the events of the two endpoints of the * association. Elements in the pair must be in the same order as the addresses in the key parameter. */ - def registerHandlePair(key: (Address, Address), readHandlers: Future[(ActorRef, ActorRef)]): Unit = { - handlersTable.put(key, readHandlers) + def registerListenerPair(key: (Address, Address), listeners: Future[(HandleEventListener, HandleEventListener)]): Unit = { + listenersTable.put(key, listeners) } /** @@ -213,8 +363,8 @@ object TestTransport { * @return * The original entries. */ - def deregisterAssociation(key: (Address, Address)): Option[Future[(ActorRef, ActorRef)]] = - Option(handlersTable.remove(key)) + def deregisterAssociation(key: (Address, Address)): Option[Future[(HandleEventListener, HandleEventListener)]] = + Option(listenersTable.remove(key)) /** * Tests if an association was registered. @@ -225,19 +375,19 @@ object TestTransport { * @return True if there is an association for the given addresses. */ def existsAssociation(initiatorAddress: Address, remoteAddress: Address): Boolean = { - handlersTable.containsKey((initiatorAddress, remoteAddress)) + listenersTable.containsKey((initiatorAddress, remoteAddress)) } /** - * Returns the event handler actor corresponding to the remote endpoint of the given local handle. In other words - * it returns the actor that will receive InboundPayload events when {{{write()}}} is called on the given handle. + * Returns the event handler corresponding to the remote endpoint of the given local handle. In other words + * it returns the listener that will receive InboundPayload events when {{{write()}}} is called on the given handle. * * @param localHandle The handle - * @return The option that contains the Future for the handler actor if exists. + * @return The option that contains the Future for the listener if exists. */ - def getRemoteReadHandlerFor(localHandle: TestAssociationHandle): Option[Future[ActorRef]] = { - Option(handlersTable.get(localHandle.key)) map { - case pairFuture: Future[(ActorRef, ActorRef)] ⇒ if (localHandle.inbound) { + def getRemoteReadHandlerFor(localHandle: TestAssociationHandle): Option[Future[HandleEventListener]] = { + Option(listenersTable.get(localHandle.key)) map { + case pairFuture: Future[(HandleEventListener, HandleEventListener)] ⇒ if (localHandle.inbound) { pairFuture.map { _._1 } } else { pairFuture.map { _._2 } @@ -251,7 +401,8 @@ object TestTransport { * @param address The address bound to the transport. * @return The transport if exists. */ - def transportFor(address: Address): Option[(TestTransport, ActorRef)] = Option(transportTable.get(address)) + def transportFor(address: Address): Option[(TestTransport, AssociationEventListener)] = + Option(transportTable.get(address)) /** * Resets the state of the registry. ''Warning!'' This method is not atomic. @@ -259,19 +410,19 @@ object TestTransport { def reset(): Unit = { clearLog() transportTable.clear() - handlersTable.clear() + listenersTable.clear() } } } /* - NOTE: This is a global shared state between different actor systems. The purpose of this class is to allow dynamically - loaded TestTransports to set up a shared AssociationRegistry. Extensions could not be used for this purpose, as the injection - of the shared instance must happen during the startup time of the actor system. Association registries are looked - up via a string key. Until we find a better way to inject an AssociationRegistry to multiple actor systems it is - strongly recommended to use long, randomly generated strings to key the registry to avoid interference between tests. - */ + NOTE: This is a global shared state between different actor systems. The purpose of this class is to allow dynamically + loaded TestTransports to set up a shared AssociationRegistry. Extensions could not be used for this purpose, as the injection + of the shared instance must happen during the startup time of the actor system. Association registries are looked + up via a string key. Until we find a better way to inject an AssociationRegistry to multiple actor systems it is + strongly recommended to use long, randomly generated strings to key the registry to avoid interference between tests. +*/ object AssociationRegistry { private final val registries = scala.collection.mutable.Map[String, AssociationRegistry]() @@ -282,165 +433,13 @@ object AssociationRegistry { def clear(): Unit = this.synchronized { registries.clear() } } -/** - * Transport implementation to be used for testing. - * - * The TestTransport is basically a shared memory between actor systems. The TestTransport could be programmed to - * emulate different failure modes of a Transport implementation. TestTransport keeps a log of the activities it was - * requested to do. This class is not optimized for performace and MUST not be used as an in-memory transport in - * production systems. - */ -class TestTransport( - val localAddress: Address, - final val registry: AssociationRegistry, - val maximumPayloadBytes: Int = 32000, - val schemeIdentifier: String = "test") extends Transport { - - def this(system: ExtendedActorSystem, conf: Config) = { - this( - AddressFromURIString(conf.getString("local-address")), - AssociationRegistry.get(conf.getString("registry-key")), - conf.getBytes("maximum-payload-bytes").toInt, - conf.getString("scheme-identifier")) - } - - import akka.remote.transport.TestTransport._ - - override def isResponsibleFor(address: Address): Boolean = true - - private val actorPromise = Promise[ActorRef]() - - private def defaultListen: Future[(Address, Promise[ActorRef])] = { - actorPromise.future.onSuccess { - case actorRef: ActorRef ⇒ registry.registerTransport(this, actorRef) - } - Promise.successful((localAddress, actorPromise)).future - } - - private def defaultAssociate(remoteAddress: Address): Future[Status] = { - registry.transportFor(remoteAddress) match { - - case Some((remoteTransport, actor)) ⇒ - val (localHandle, remoteHandle) = createHandlePair(remoteTransport, remoteAddress) - - val bothSides: Future[(ActorRef, ActorRef)] = for ( - actor1 ← localHandle.readHandlerPromise.future; - actor2 ← remoteHandle.readHandlerPromise.future - ) yield (actor1, actor2) - - registry.registerHandlePair(localHandle.key, bothSides) - actor ! InboundAssociation(remoteHandle) - - Promise.successful(Ready(localHandle)).future - - case None ⇒ - Promise.successful(Fail(new IllegalArgumentException(s"No registered transport: $remoteAddress"))).future - } - } - - private def createHandlePair(remoteTransport: TestTransport, remoteAddress: Address): (TestAssociationHandle, TestAssociationHandle) = { - val localHandle = new TestAssociationHandle(localAddress, remoteAddress, this, inbound = false) - val remoteHandle = new TestAssociationHandle(remoteAddress, localAddress, remoteTransport, inbound = true) - - (localHandle, remoteHandle) - } - - private def defaultShutdown: Future[Unit] = Promise.successful(()).future - - /** - * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method. - */ - val listenBehavior = new SwitchableLoggedBehavior[Unit, (Address, Promise[ActorRef])]( - (_) ⇒ defaultListen, - (_) ⇒ registry.logActivity(ListenAttempt(localAddress))) - - /** - * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the associate() method. - */ - val associateBehavior = new SwitchableLoggedBehavior[Address, Status]( - defaultAssociate _, - (remoteAddress) ⇒ registry.logActivity(AssociateAttempt(localAddress, remoteAddress))) - - /** - * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the shutdown() method. - */ - val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Unit]( - (_) ⇒ defaultShutdown, - (_) ⇒ registry.logActivity(ShutdownAttempt(localAddress))) - - override def listen: Future[(Address, Promise[ActorRef])] = listenBehavior() - override def associate(remoteAddress: Address): Future[Status] = associateBehavior(remoteAddress) - override def shutdown(): Unit = shutdownBehavior() - - private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = { - registry.getRemoteReadHandlerFor(params._1) match { - case Some(futureActor) ⇒ - val writePromise = Promise[Boolean]() - futureActor.onSuccess { - case actor ⇒ actor ! InboundPayload(params._2); writePromise.success(true) - } - writePromise.future - case None ⇒ - Promise.failed(new IllegalStateException("No association present")).future - } - } - - private def defaultDisassociate(handle: TestAssociationHandle): Future[Unit] = { - registry.deregisterAssociation(handle.key).foreach { - case f: Future[(ActorRef, ActorRef)] ⇒ f.onSuccess { - case (handler1, handler2) ⇒ - val handler = if (handle.inbound) handler2 else handler1 - handler ! Disassociated - } - - } - Promise.successful(()).future - } - - /** - * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the write() method on handles. All - * handle calls pass through this call. Please note, that write operations return a Boolean synchronously, so - * altering the behavior via pushDelayed will turn write to a blocking operation -- use of pushDelayed therefore - * is not recommended. - */ - val writeBehavior = new SwitchableLoggedBehavior[(TestAssociationHandle, ByteString), Boolean]( - defaultBehavior = { - defaultWrite _ - }, - logCallback = { - case (handle, payload) ⇒ - registry.logActivity(WriteAttempt(handle.localAddress, handle.remoteAddress, payload)) - }) - - /** - * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the disassociate() method on handles. All - * handle calls pass through this call. - */ - val disassociateBehavior = new SwitchableLoggedBehavior[TestAssociationHandle, Unit]( - defaultBehavior = { - defaultDisassociate _ - }, - logCallback = { - (handle) ⇒ - registry.logActivity(DisassociateAttempt(handle.localAddress, handle.remoteAddress)) - }) - - private[akka] def write(handle: TestAssociationHandle, payload: ByteString): Boolean = - Await.result(writeBehavior((handle, payload)), 3 seconds) - - private[akka] def disassociate(handle: TestAssociationHandle): Unit = disassociateBehavior(handle) - - override def toString: String = s"TestTransport($localAddress)" - -} - case class TestAssociationHandle( localAddress: Address, remoteAddress: Address, transport: TestTransport, inbound: Boolean) extends AssociationHandle { - override val readHandlerPromise: Promise[ActorRef] = Promise() + override val readHandlerPromise: Promise[HandleEventListener] = Promise() override def write(payload: ByteString): Boolean = transport.write(this, payload) diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala new file mode 100644 index 0000000000..15e398f6e9 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -0,0 +1,425 @@ +package akka.remote.transport + +import ThrottlerTransportAdapter._ +import akka.actor._ +import akka.pattern.pipe +import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying +import akka.remote.transport.ActorTransportAdapter.ListenUnderlying +import akka.remote.transport.ActorTransportAdapter.ListenerRegistered +import akka.remote.transport.AkkaPduCodec.Associate +import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload, HandleEventListener } +import akka.remote.transport.ThrottledAssociation._ +import akka.remote.transport.ThrottlerManager.Checkin +import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle +import akka.remote.transport.Transport._ +import akka.util.ByteString +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import scala.concurrent.Promise +import scala.math.min +import scala.util.Success +import scala.util.control.NonFatal +import scala.concurrent.duration._ + +class ThrottlerProvider extends TransportAdapterProvider { + + def apply(wrappedTransport: Transport, system: ExtendedActorSystem): Transport = + new ThrottlerTransportAdapter(wrappedTransport, system) + +} + +object ThrottlerTransportAdapter { + val SchemeIdentifier = "trttl" + val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0) + + sealed trait Direction { + def includes(other: Direction): Boolean + } + + object Direction { + case object Send extends Direction { + override def includes(other: Direction): Boolean = other match { + case Send ⇒ true + case _ ⇒ false + } + } + case object Receive extends Direction { + override def includes(other: Direction): Boolean = other match { + case Receive ⇒ true + case _ ⇒ false + } + } + case object Both extends Direction { + override def includes(other: Direction): Boolean = true + } + } + + case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode) + + sealed trait ThrottleMode { + def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) + def timeToAvailable(currentTime: Long, tokens: Int): Long + } + + case class TokenBucket(capacity: Int, tokensPerSecond: Double, lastSend: Long, availableTokens: Int) + extends ThrottleMode { + + private def isAvailable(timeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) { + true // Allow messages larger than capacity through, it will be recorded as negative tokens + } else min((availableTokens + tokensGenerated(timeOfSend)), capacity) >= tokens + + override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = { + if (isAvailable(timeOfSend, tokens)) + (this.copy( + lastSend = timeOfSend, + availableTokens = min(availableTokens - tokens + tokensGenerated(timeOfSend), capacity)), true) + else (this, false) + } + + override def timeToAvailable(currentTime: Long, tokens: Int): Long = { + val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentTime) + TimeUnit.SECONDS.toNanos((needed / tokensPerSecond).toLong) + } + + private def tokensGenerated(timeOfSend: Long): Int = + (TimeUnit.NANOSECONDS.toMillis(timeOfSend - lastSend) * tokensPerSecond / 1000.0).toInt + } + + case object Unthrottled extends ThrottleMode { + + override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true) + override def timeToAvailable(currentTime: Long, tokens: Int): Long = 1L + } + + case object Blackhole extends ThrottleMode { + override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false) + override def timeToAvailable(currentTime: Long, tokens: Int): Long = 0L + } +} + +class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedActorSystem) + extends ActorTransportAdapter(_wrappedTransport, _system) { + + override protected def addedSchemeIdentifier = SchemeIdentifier + override protected def maximumOverhead = 0 + protected def managerName = s"throttlermanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}" + protected def managerProps = Props(new ThrottlerManager(wrappedTransport)) + + override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match { + case s @ SetThrottle(_, _, _) ⇒ + manager ! s + statusPromise.success(true) + case _ ⇒ wrappedTransport.managementCommand(cmd, statusPromise) + } +} + +private[transport] object ThrottlerManager { + case class OriginResolved() + case class Checkin(origin: Address, handle: ThrottlerHandle) +} + +private[transport] class ThrottlerManager(wrappedTransport: Transport) extends Actor { + + import context.dispatcher + + private val ids = Iterator from 0 + var localAddress: Address = _ + private var associationHandler: AssociationEventListener = _ + private var throttlingModes = Map[Address, (ThrottleMode, Direction)]() + private var handleTable = Map[Address, ThrottlerHandle]() + + private def nakedAddress(address: Address): Address = address.copy(protocol = "", system = "") + + override def postStop(): Unit = wrappedTransport.shutdown() + + def receive: Receive = { + case ListenUnderlying(listenAddress, upstreamListenerFuture) ⇒ + localAddress = listenAddress + upstreamListenerFuture.future.map { ListenerRegistered(_) } pipeTo self + + case ListenerRegistered(listener) ⇒ + associationHandler = listener + context.become(ready) + + // Block inbound associations until handler is registered + case InboundAssociation(handle) ⇒ + handle.disassociate() + } + + private def ready: Receive = { + case InboundAssociation(handle) ⇒ + val wrappedHandle = wrapHandle(handle, true) + wrappedHandle.throttlerActor ! wrappedHandle + case AssociateUnderlying(remoteAddress, statusPromise) ⇒ + wrappedTransport.associate(remoteAddress).onComplete { + case Success(Ready(handle)) ⇒ + val wrappedHandle = wrapHandle(handle, false) + val inMode = getInboundMode(nakedAddress(remoteAddress)) + wrappedHandle.outboundThrottleMode.set(getOutboundMode(nakedAddress(remoteAddress))) + wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor + + statusPromise.success(Ready(wrappedHandle)) + case s @ _ ⇒ statusPromise.complete(s) + } + case s @ SetThrottle(address, direction, mode) ⇒ + val naked = nakedAddress(address) + throttlingModes += naked -> (mode, direction) + handleTable.get(naked) match { + case Some(handle) ⇒ setMode(handle, mode, direction) + case None ⇒ + } + case Checkin(origin, handle) ⇒ + val naked: Address = nakedAddress(origin) + handleTable += naked -> handle + setMode(naked, handle) + + } + + private def getInboundMode(nakedAddress: Address): ThrottleMode = { + throttlingModes.get(nakedAddress) match { + case Some((mode, direction)) if direction.includes(Direction.Receive) ⇒ mode + case _ ⇒ Unthrottled + } + } + + private def getOutboundMode(nakedAddress: Address): ThrottleMode = { + throttlingModes.get(nakedAddress) match { + case Some((mode, direction)) if direction.includes(Direction.Send) ⇒ mode + case _ ⇒ Unthrottled + } + } + + private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = { + throttlingModes.get(nakedAddress) match { + case Some((mode, direction)) ⇒ setMode(handle, mode, direction) + case None ⇒ setMode(handle, Unthrottled, Direction.Both) + } + } + + private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = { + if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode + if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode) + } + + private def wrapHandle(originalHandle: AssociationHandle, inbound: Boolean): ThrottlerHandle = { + val throttlerActor = context.actorOf(Props(new ThrottledAssociation(self, associationHandler, originalHandle, inbound)), + "throttler" + ids.next()) + val handle = ThrottlerHandle(originalHandle, throttlerActor) + handleTable += nakedAddress(originalHandle.remoteAddress) -> handle + handle + } + +} + +object ThrottledAssociation { + case object Dequeue + + sealed trait ThrottlerState + + // --- Chain of states for inbound associations + + // Waiting for the ThrottlerHandle coupled with the throttler actor. + case object WaitExposedHandle extends ThrottlerState + // Waiting for the ASSOCIATE message that contains the origin address of the remote endpoint + case object WaitOrigin extends ThrottlerState + // After origin is known and a Checkin message is sent to the manager, we must wait for the ThrottlingMode for the + // address + case object WaitMode extends ThrottlerState + // After all information is known, the throttler must wait for the upstream listener to be able to forward messages + case object WaitUpstreamListener extends ThrottlerState + + // --- States for outbound associations + + // Waiting for the tuple containing the upstream listener and ThrottleMode + case object WaitModeAndUpstreamListener extends ThrottlerState + + // Fully initialized state + case object Throttling extends ThrottlerState + + sealed trait ThrottlerData + case object Uninitialized extends ThrottlerData + case class ExposedHandle(handle: ThrottlerHandle) extends ThrottlerData +} + +private[transport] class ThrottledAssociation( + val manager: ActorRef, + val associationHandler: AssociationEventListener, + val originalHandle: AssociationHandle, + val inbound: Boolean) + extends Actor with LoggingFSM[ThrottlerState, ThrottlerData] { + import context.dispatcher + + var inboundThrottleMode: ThrottleMode = _ + var queue = Queue.empty[ByteString] + var upstreamListener: HandleEventListener = _ + + override def postStop(): Unit = originalHandle.disassociate() + + if (inbound) startWith(WaitExposedHandle, Uninitialized) else { + originalHandle.readHandlerPromise.success(self) + startWith(WaitModeAndUpstreamListener, Uninitialized) + } + + when(WaitExposedHandle) { + case Event(handle: ThrottlerHandle, Uninitialized) ⇒ + // register to downstream layer and wait for origin + originalHandle.readHandlerPromise.success(self) + goto(WaitOrigin) using ExposedHandle(handle) + } + + when(WaitOrigin) { + case Event(InboundPayload(p), ExposedHandle(exposedHandle)) ⇒ + queue = queue enqueue p + peekOrigin(p) match { + case Some(origin) ⇒ + manager ! Checkin(origin, exposedHandle) + goto(WaitMode) + case None ⇒ stay() + } + } + + when(WaitMode) { + case Event(InboundPayload(p), _) ⇒ + queue = queue enqueue p + stay() + case Event(mode: ThrottleMode, ExposedHandle(exposedHandle)) ⇒ + inboundThrottleMode = mode + if (inboundThrottleMode == Blackhole) { + queue = Queue.empty[ByteString] + exposedHandle.disassociate() + stop() + } else { + associationHandler notify InboundAssociation(exposedHandle) + exposedHandle.readHandlerPromise.future pipeTo self + goto(WaitUpstreamListener) + } + } + + when(WaitUpstreamListener) { + case Event(InboundPayload(p), _) ⇒ + queue = queue enqueue p + stay() + case Event(listener: HandleEventListener, _) ⇒ + upstreamListener = listener + self ! Dequeue + goto(Throttling) + } + + when(WaitModeAndUpstreamListener) { + case Event((listener: HandleEventListener, mode: ThrottleMode), _) ⇒ + upstreamListener = listener + inboundThrottleMode = mode + self ! Dequeue + goto(Throttling) + case Event(InboundPayload(p), _) ⇒ + queue = queue enqueue p + stay() + } + + when(Throttling) { + case Event(mode: ThrottleMode, _) ⇒ + inboundThrottleMode = mode + if (inboundThrottleMode == Blackhole) queue = Queue.empty[ByteString] + stay() + case Event(InboundPayload(p), _) ⇒ + forwardOrDelay(p) + stay() + + case Event(Dequeue, _) ⇒ + if (!queue.isEmpty) { + val (payload, newqueue) = queue.dequeue + upstreamListener notify InboundPayload(payload) + queue = newqueue + inboundThrottleMode = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), payload.length)._1 + if (inboundThrottleMode == Unthrottled && !queue.isEmpty) self ! Dequeue + else if (!queue.isEmpty) { + context.system.scheduler.scheduleOnce( + inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length) nanoseconds, self, Dequeue) + } + } + stay() + + } + + whenUnhandled { + case Event(Disassociated, _) ⇒ + if (upstreamListener ne null) upstreamListener notify Disassociated + originalHandle.disassociate() + stop() + + } + + // This method captures ASSOCIATE packets and extracts the origin address + private def peekOrigin(b: ByteString): Option[Address] = { + try { + AkkaPduProtobufCodec.decodePdu(b) match { + case Associate(_, origin) ⇒ Some(origin) + case _ ⇒ None + } + } catch { + // This layer should not care about malformed packets. Also, this also useful for testing, because + // arbitrary payload could be passed in + case NonFatal(e) ⇒ None + } + } + + def forwardOrDelay(payload: ByteString): Unit = { + if (inboundThrottleMode == Blackhole) { + // Do nothing + } else { + if (queue.isEmpty) { + val tokens = payload.length + val (newbucket, success) = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), tokens) + if (success) { + inboundThrottleMode = newbucket + upstreamListener notify InboundPayload(payload) + } else { + queue = queue.enqueue(payload) + + context.system.scheduler.scheduleOnce( + inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens) nanoseconds, self, Dequeue) + } + } else { + queue = queue.enqueue(payload) + } + } + } + +} + +private[transport] case class ThrottlerHandle(_wrappedHandle: AssociationHandle, throttlerActor: ActorRef) + extends AbstractTransportAdapterHandle(_wrappedHandle, SchemeIdentifier) { + + private[transport] val outboundThrottleMode = new AtomicReference[ThrottleMode](Unthrottled) + + override val readHandlerPromise: Promise[HandleEventListener] = Promise() + + override def write(payload: ByteString): Boolean = { + val tokens = payload.length + + @tailrec def tryConsume(currentBucket: ThrottleMode): Boolean = { + val timeOfSend = System.nanoTime() + val (newBucket, allow) = currentBucket.tryConsumeTokens(timeOfSend, tokens) + if (allow) { + if (outboundThrottleMode.compareAndSet(currentBucket, newBucket)) true + else tryConsume(outboundThrottleMode.get()) + } else false + } + + outboundThrottleMode.get match { + case Blackhole ⇒ true + case bucket @ _ ⇒ + val success = tryConsume(outboundThrottleMode.get()) + if (success) wrappedHandle.write(payload) + success + } + + } + + override def disassociate(): Unit = { + throttlerActor ! PoisonPill + } + +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 3ef6908f49..8ade317dd5 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -3,23 +3,27 @@ package akka.remote.transport import concurrent.{ Promise, Future } import akka.actor.{ ActorRef, Address } import akka.util.ByteString +import akka.remote.transport.Transport.AssociationEvent +import akka.remote.transport.AssociationHandle.HandleEventListener object Transport { + trait AssociationEvent + /** * Represents fine grained status of an association attempt. */ - sealed trait Status + sealed trait Status extends AssociationEvent /** * Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address, - * hostname, etc.). Invalid association requests are impossible to recover. + * hostname, etc.). */ case class Invalid(cause: Throwable) extends Status /** - * The association setup has failed, but no information can be provided about the probability of the success of a - * setup retry. + * The association setup has failed, but it is not known that a recovery is possible or not. Generally it means + * that the transport gave up its attempts to associate, but a retry might be successful at a later time. * * @param cause Cause of the failure */ @@ -36,19 +40,42 @@ object Transport { case class Ready(association: AssociationHandle) extends Status /** - * Message sent to an actor registered to a transport (via the Promise returned by - * [[akka.remote.transport.Transport.listen]]) when an inbound association request arrives. + * Message sent to a [[akka.remote.transport.Transport.AssociationEventListener]] registered to a transport + * (via the Promise returned by [[akka.remote.transport.Transport.listen]]) when an inbound association request arrives. * * @param association * The handle for the inbound association. */ - case class InboundAssociation(association: AssociationHandle) + case class InboundAssociation(association: AssociationHandle) extends AssociationEvent + /** + * An interface that needs to be implemented by the user of a transport to listen to association events + */ + trait AssociationEventListener { + + /** + * Called by the transport to notify the listener about an AssociationEvent + * @param ev The AssociationEvent of the transport + */ + def notify(ev: AssociationEvent): Unit + } + + /** + * Class to convert ordinary [[akka.actor.ActorRef]] instances to an AssociationEventListener. The adapter will + * forward event objects as messages to the provided ActorRef. + * @param actor + */ + case class ActorAssociationEventListener(actor: ActorRef) extends AssociationEventListener { + override def notify(ev: AssociationEvent): Unit = actor ! ev + } + + implicit def actorRef2HandleEventListener(actor: ActorRef): AssociationEventListener = + ActorAssociationEventListener(actor) } /** - * An SPI layer for implementing asynchronous transport mechanisms. The transport is responsible for initializing the - * underlying transport mechanism and setting up logical links between transport entities. + * An SPI layer for implementing asynchronous transport mechanisms. The Transport is responsible for initializing the + * underlying transmission mechanism and setting up logical links between transport entities. * * Transport implementations that are loaded dynamically by the remoting must have a constructor that accepts a * [[com.typesafe.config.Config]] and an [[akka.actor.ExtendedActorSystem]] as parameters. @@ -86,14 +113,15 @@ trait Transport { /** * Asynchronously attempts to setup the transport layer to listen and accept incoming associations. The result of the * attempt is wrapped by a Future returned by this method. The pair contained in the future contains a Promise for an - * ActorRef. By completing this Promise with an ActorRef, that ActorRef becomes responsible for handling incoming - * associations. Until the Promise is not completed, no associations are processed. + * ActorRef. By completing this Promise with an [[akka.remote.transport.Transport.AssociationEventListener]], that + * listener becomes responsible for handling incoming associations. Until the Promise is not completed, no associations + * are processed. * * @return - * A Future containing a pair of the bound local address and a Promise of an ActorRef that must be fulfilled - * by the consumer of the future. + * A Future containing a pair of the bound local address and a Promise of an AssociationListener that must be + * completed by the consumer of the future. */ - def listen: Future[(Address, Promise[ActorRef])] + def listen: Future[(Address, Promise[AssociationEventListener])] /** * Asynchronously opens a logical duplex link between two Transport Entities over a network. It could be backed by a @@ -118,36 +146,69 @@ trait Transport { */ def shutdown(): Unit + /** + * This method allows upper layers to send management commands to the transport. It is the responsibility of the + * sender to send appropriate commands to different transport implementations. Unknown commands will be ignored. + * + * @param cmd Command message to the transport + * @return Future that succeeds when the command was handled or dropped + */ + def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = { statusPromise.success(false) } + } object AssociationHandle { /** - * Trait for events that the registered actor for an [[akka.remote.transport.AssociationHandle]] might receive. + * Trait for events that the registered listener for an [[akka.remote.transport.AssociationHandle]] might receive. */ - sealed trait AssociationEvent + sealed trait HandleEvent /** - * Message sent to the actor registered to an association (via the Promise returned by + * Message sent to the listener registered to an association (via the Promise returned by * [[akka.remote.transport.AssociationHandle.readHandlerPromise]]) when an inbound payload arrives. * * @param payload * The raw bytes that were sent by the remote endpoint. */ - case class InboundPayload(payload: ByteString) extends AssociationEvent + case class InboundPayload(payload: ByteString) extends HandleEvent { + override def toString: String = s"InboundPayload(size = ${payload.length} bytes)" + } /** - * Message sent to te actor registered to an association + * Message sent to the listener registered to an association */ - case object Disassociated extends AssociationEvent + case object Disassociated extends HandleEvent + /** + * An interface that needs to be implemented by the user of an [[akka.remote.transport.AssociationHandle]] + * to listen to association events. + */ + trait HandleEventListener { + /** + * Called by the transport to notify the listener about a HandleEvent + * @param ev The HandleEvent of the handle + */ + def notify(ev: HandleEvent): Unit + } + + /** + * Class to convert ordinary [[akka.actor.ActorRef]] instances to a HandleEventListener. The adapter will + * forward event objects as messages to the provided ActorRef. + * @param actor + */ + case class ActorHandleEventListener(actor: ActorRef) extends HandleEventListener { + override def notify(ev: HandleEvent): Unit = actor ! ev + } + + implicit def actorRef2HandleEventListener(actor: ActorRef): HandleEventListener = ActorHandleEventListener(actor) } /** - * An SPI layer for abstracting over logical links (associations) created by [[akka.remote.transport.Transport]]. + * An SPI layer for abstracting over logical links (associations) created by a [[akka.remote.transport.Transport]]. * Handles are responsible for providing an API for sending and receiving from the underlying channel. * - * To register an actor for processing incoming payload data, the actor must be registered by completing the Promise + * To register a listener for processing incoming payload data, the listener must be registered by completing the Promise * returned by [[akka.remote.transport.AssociationHandle#readHandlerPromise]]. Incoming data is not processed until * this registration takes place. */ @@ -170,22 +231,23 @@ trait AssociationHandle { def remoteAddress: Address /** - * The Promise returned by this call must be completed with an [[akka.actor.ActorRef]] to register an actor - * responsible for handling incoming payload. + * The Promise returned by this call must be completed with an [[akka.remote.transport.AssociationHandle.HandleEventListener]] + * to register a listener responsible for handling incoming payload. Until the listener is not registered the + * transport SHOULD buffer incoming messages. * * @return - * Promise of the ActorRef of the actor responsible for handling incoming data. + * Promise that must be completed with the listener responsible for handling incoming data. */ - def readHandlerPromise: Promise[ActorRef] + def readHandlerPromise: Promise[HandleEventListener] /** - * Asynchronously sends the specified payload to the remote endpoint. This method must be thread-safe as it might - * be called from different threads. This method must not block. + * Asynchronously sends the specified payload to the remote endpoint. This method MUST be thread-safe as it might + * be called from different threads. This method MUST NOT block. * * Writes guarantee ordering of messages, but not their reception. The call to write returns with * a Boolean indicating if the channel was ready for writes or not. A return value of false indicates that the * channel is not yet ready for delivery (e.g.: the write buffer is full) and the sender needs to wait - * until the channel becomes ready again. Returning false also means that the current write was dropped (this is + * until the channel becomes ready again. Returning false also means that the current write was dropped (this MUST be * guaranteed to ensure duplication-free delivery). * * @param payload @@ -196,9 +258,10 @@ trait AssociationHandle { def write(payload: ByteString): Boolean /** - * Closes the underlying transport link, if needed. Some transport may not need an explicit teardown (UDP) and - * some transports may not support it (hardware connections). Remote endpoint of the channel or connection ''may'' - * be notified, but this is not guaranteed. + * Closes the underlying transport link, if needed. Some transports might not need an explicit teardown (UDP) and + * some transports may not support it (hardware connections). Remote endpoint of the channel or connection MAY + * be notified, but this is not guaranteed. The Transport that provides the handle MUST guarantee that disassociate() + * could be called arbitrarily many times. * */ def disassociate(): Unit diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index b981d5c09e..54497a447b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -1,7 +1,7 @@ package akka.remote.transport.netty import akka.ConfigurationException -import akka.actor.{ Address, ExtendedActorSystem, ActorRef } +import akka.actor.{ Address, ExtendedActorSystem } import akka.event.Logging import akka.remote.netty.{ SSLSettings, NettySSLSupport, DefaultDisposableChannelGroup } import akka.remote.transport.Transport._ @@ -21,6 +21,7 @@ import scala.concurrent.{ ExecutionContext, Promise, Future } import scala.util.Random import scala.util.control.NonFatal import akka.dispatch.ThreadPoolConfig +import akka.remote.transport.AssociationHandle.HandleEventListener object NettyTransportSettings { sealed trait Mode @@ -37,7 +38,7 @@ class NettyTransportSettings(config: Config) { val TransportMode: Mode = getString("transport-protocol") match { case "tcp" ⇒ Tcp case "udp" ⇒ Udp - case s @ _ ⇒ throw new ConfigurationException("Unknown transport specified in transport-protocol: " + s) + case s @ _ ⇒ throw new ConfigurationException("Unknown transport: " + s) } val EnableSsl: Boolean = if (getBoolean("enable-ssl") && TransportMode == Udp) @@ -95,13 +96,15 @@ trait HasTransport { } trait CommonHandlers extends NettyHelpers with HasTransport { - import transport.executionContext final override def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = transport.channels.add(e.getChannel) protected def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle - protected def registerReader(channel: Channel, readerRef: ActorRef, msg: ChannelBuffer, remoteSocketAddress: InetSocketAddress): Unit + protected def registerListener(channel: Channel, + listener: HandleEventListener, + msg: ChannelBuffer, + remoteSocketAddress: InetSocketAddress): Unit final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer)(op: (AssociationHandle ⇒ Any)): Unit = { import transport._ @@ -109,8 +112,8 @@ trait CommonHandlers extends NettyHelpers with HasTransport { case (Some(localAddress), Some(remoteAddress)) ⇒ val handle = createHandle(channel, localAddress, remoteAddress) handle.readHandlerPromise.future.onSuccess { - case readerRef: ActorRef ⇒ - registerReader(channel, readerRef, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress]) + case listener: HandleEventListener ⇒ + registerListener(channel, listener, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress]) channel.setReadable(true) } op(handle) @@ -121,14 +124,15 @@ trait CommonHandlers extends NettyHelpers with HasTransport { } abstract class ServerHandler(protected final val transport: NettyTransport, - private final val associationHandlerFuture: Future[ActorRef]) + private final val associationListenerFuture: Future[AssociationEventListener]) extends NettyServerHelpers with CommonHandlers with HasTransport { + import transport.executionContext final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { channel.setReadable(false) - associationHandlerFuture.onSuccess { - case ref: ActorRef ⇒ init(channel, remoteSocketAddress, msg) { ref ! InboundAssociation(_) } + associationListenerFuture.onSuccess { + case listener: AssociationEventListener ⇒ init(channel, remoteSocketAddress, msg) { listener notify InboundAssociation(_) } } } @@ -157,6 +161,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s import NettyTransport._ import settings._ + implicit val executionContext: ExecutionContext = system.dispatcher override val schemeIdentifier: String = TransportMode + (if (EnableSsl) ".ssl" else "") @@ -169,7 +174,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s private val log = Logging(system, this.getClass) - final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, ActorRef]() + final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]() val channels = new DefaultDisposableChannelGroup("netty-transport-" + Random.nextString(20)) @@ -202,13 +207,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s pipeline } - private val associationHandlerPromise: Promise[ActorRef] = Promise() + private val associationListenerPromise: Promise[AssociationEventListener] = Promise() private val serverPipelineFactory: ChannelPipelineFactory = new ChannelPipelineFactory { override def getPipeline: ChannelPipeline = { val pipeline = newPipeline if (EnableSsl) pipeline.addFirst("SslHandler", NettySSLSupport(settings.SslSettings.get, log, false)) - val handler = if (isDatagram) new UdpServerHandler(NettyTransport.this, associationHandlerPromise.future) - else new TcpServerHandler(NettyTransport.this, associationHandlerPromise.future) + val handler = if (isDatagram) new UdpServerHandler(NettyTransport.this, associationListenerPromise.future) + else new TcpServerHandler(NettyTransport.this, associationListenerPromise.future) pipeline.addLast("ServerHandler", handler) pipeline } @@ -266,8 +271,8 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s def addressToSocketAddress(addr: Address): InetSocketAddress = new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get) - override def listen: Future[(Address, Promise[ActorRef])] = { - val listenPromise: Promise[(Address, Promise[ActorRef])] = Promise() + override def listen: Future[(Address, Promise[AssociationEventListener])] = { + val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise() try { masterChannel = inboundBootstrap match { @@ -282,12 +287,12 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s addressFromSocketAddress(masterChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match { case Some(address) ⇒ - val handlerPromise: Promise[ActorRef] = Promise() - listenPromise.success((address, handlerPromise)) + val listenerPromise: Promise[AssociationEventListener] = Promise() + listenPromise.success((address, listenerPromise)) localAddress = address - handlerPromise.future.onSuccess { - case ref: ActorRef ⇒ - associationHandlerPromise.success(ref) + listenerPromise.future.onSuccess { + case listener: AssociationEventListener ⇒ + associationListenerPromise.success(listener) masterChannel.setReadable(true) } @@ -338,7 +343,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s case addr: InetSocketAddress ⇒ statusPromise.success(Ready(handle)) handle.readHandlerPromise.future.onSuccess { - case ref: ActorRef ⇒ udpConnectionTable.put(addr, ref) + case listener: HandleEventListener ⇒ udpConnectionTable.put(addr, listener) } case a @ _ ⇒ statusPromise.success(Fail( new NettyTransportException("Unknown remote address type " + a.getClass, null))) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 5a006e0bd0..6b78add062 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -1,48 +1,48 @@ package akka.remote.transport.netty -import akka.actor.{ Address, ActorRef } +import akka.actor.{ Address } import akka.remote.transport.AssociationHandle -import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload } -import akka.remote.transport.Transport.Status +import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener, Disassociated, InboundPayload } +import akka.remote.transport.Transport.{ AssociationEventListener, Status } import akka.util.ByteString import java.net.InetSocketAddress import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer } import org.jboss.netty.channel._ import scala.concurrent.{ Future, Promise } -object ChannelLocalActor extends ChannelLocal[Option[ActorRef]] { - override def initialValue(channel: Channel): Option[ActorRef] = None - def trySend(channel: Channel, msg: Any): Unit = get(channel) foreach { _ ! msg } +object ChannelLocalActor extends ChannelLocal[Option[HandleEventListener]] { + override def initialValue(channel: Channel): Option[HandleEventListener] = None + def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel) foreach { _ notify msg } } trait TcpHandlers extends CommonHandlers with HasTransport { import ChannelLocalActor._ - override def registerReader(channel: Channel, - readerRef: ActorRef, - msg: ChannelBuffer, - remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(readerRef)) + override def registerListener(channel: Channel, + listener: HandleEventListener, + msg: ChannelBuffer, + remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(listener)) override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = new TcpAssociationHandle(localAddress, remoteAddress, channel) override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent) { - trySend(e.getChannel, Disassociated) + notifyListener(e.getChannel, Disassociated) } override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) { - trySend(e.getChannel, InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))) + notifyListener(e.getChannel, InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))) } override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent) { - trySend(e.getChannel, Disassociated) + notifyListener(e.getChannel, Disassociated) e.getChannel.close() // No graceful close here } } -class TcpServerHandler(_transport: NettyTransport, _associationHandlerFuture: Future[ActorRef]) - extends ServerHandler(_transport, _associationHandlerFuture) with TcpHandlers { +class TcpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener]) + extends ServerHandler(_transport, _associationListenerFuture) with TcpHandlers { override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent) { initInbound(e.getChannel, e.getChannel.getRemoteAddress, null) @@ -62,7 +62,7 @@ class TcpClientHandler(_transport: NettyTransport, _statusPromise: Promise[Statu class TcpAssociationHandle(val localAddress: Address, val remoteAddress: Address, private val channel: Channel) extends AssociationHandle { - override val readHandlerPromise: Promise[ActorRef] = Promise() + override val readHandlerPromise: Promise[HandleEventListener] = Promise() override def write(payload: ByteString): Boolean = if (channel.isWritable && channel.isOpen) { channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer)) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index 8e92a57980..bbc570ba8f 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -1,9 +1,9 @@ package akka.remote.transport.netty -import akka.actor.{ ActorRef, Address } +import akka.actor.{ Address } import akka.remote.transport.AssociationHandle -import akka.remote.transport.AssociationHandle.InboundPayload -import akka.remote.transport.Transport.Status +import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload } +import akka.remote.transport.Transport.{ AssociationEventListener, Status } import akka.util.ByteString import java.net.{ SocketAddress, InetAddress, InetSocketAddress } import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } @@ -15,16 +15,16 @@ trait UdpHandlers extends CommonHandlers with HasTransport { override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = new UdpAssociationHandle(localAddress, remoteAddress, channel, transport) - override def registerReader(channel: Channel, - readerRef: ActorRef, - msg: ChannelBuffer, - remoteSocketAddress: InetSocketAddress): Unit = { - val oldReader: ActorRef = transport.udpConnectionTable.putIfAbsent(remoteSocketAddress, readerRef) + override def registerListener(channel: Channel, + listener: HandleEventListener, + msg: ChannelBuffer, + remoteSocketAddress: InetSocketAddress): Unit = { + val oldReader: HandleEventListener = transport.udpConnectionTable.putIfAbsent(remoteSocketAddress, listener) if (oldReader ne null) { - throw new NettyTransportException(s"Reader $readerRef attempted to register for remote address $remoteSocketAddress" + + throw new NettyTransportException(s"Listener $listener attempted to register for remote address $remoteSocketAddress" + s" but $oldReader was already registered.", null) } - readerRef ! InboundPayload(ByteString(msg.array())) + listener notify InboundPayload(ByteString(msg.array())) } override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) { @@ -35,8 +35,8 @@ trait UdpHandlers extends CommonHandlers with HasTransport { initUdp(e.getChannel, e.getRemoteAddress, e.getMessage.asInstanceOf[ChannelBuffer]) } else { - val reader = transport.udpConnectionTable.get(inetSocketAddress) - reader ! InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array())) + val listener = transport.udpConnectionTable.get(inetSocketAddress) + listener notify InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array())) } } } @@ -44,8 +44,8 @@ trait UdpHandlers extends CommonHandlers with HasTransport { def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit } -class UdpServerHandler(_transport: NettyTransport, _associationHandlerFuture: Future[ActorRef]) - extends ServerHandler(_transport, _associationHandlerFuture) with UdpHandlers { +class UdpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener]) + extends ServerHandler(_transport, _associationListenerFuture) with UdpHandlers { override def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = initInbound(channel, remoteSocketAddress, msg) @@ -63,7 +63,7 @@ class UdpAssociationHandle(val localAddress: Address, private val channel: Channel, private val transport: NettyTransport) extends AssociationHandle { - override val readHandlerPromise: Promise[ActorRef] = Promise() + override val readHandlerPromise: Promise[HandleEventListener] = Promise() override def write(payload: ByteString): Boolean = { if (!channel.isConnected) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index dd16edade0..61443879c0 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -38,6 +38,7 @@ object RemoteCommunicationSpec { class RemoteCommunicationSpec extends AkkaSpec(""" akka { actor.provider = "akka.remote.RemoteActorRefProvider" + remote.transport = "akka.remote.netty.NettyRemoteTransport" remote.netty { hostname = localhost port = 12345 @@ -48,7 +49,7 @@ akka { /looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345" } } -""") with ImplicitSender with DefaultTimeout { + """) with ImplicitSender with DefaultTimeout { import RemoteCommunicationSpec._ diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 45b6ad5610..06a0882528 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -19,9 +19,10 @@ class RemoteConfigSpec extends AkkaSpec( } """) { + // These tests are ignored as it tests configuration specific to the old remoting. "Remoting" must { - "be able to parse generic remote config elements" in { + "be able to parse generic remote config elements" ignore { val settings = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].remoteSettings import settings._ @@ -31,7 +32,7 @@ class RemoteConfigSpec extends AkkaSpec( LogRemoteLifeCycleEvents must be(true) } - "be able to parse Netty config elements" in { + "be able to parse Netty config elements" ignore { val settings = system.asInstanceOf[ExtendedActorSystem] .provider.asInstanceOf[RemoteActorRefProvider] diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index a7b1c3699c..636910aa64 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -13,17 +13,18 @@ akka { actor { provider = "akka.remote.RemoteActorRefProvider" deployment { - /watchers.remote = "akka://other@127.0.0.1:2666" + /watchers.remote = "tcp.akka://other@localhost:2666" } } - remote.netty { - hostname = "127.0.0.1" + remoting.tcp { + hostname = "localhost" port = 0 } } -""")) with ImplicitSender with DefaultTimeout with DeathWatchSpec { + """)) with ImplicitSender with DefaultTimeout with DeathWatchSpec { - val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.netty.port=2666").withFallback(system.settings.config)) + val other = ActorSystem("other", ConfigFactory.parseString("akka.remoting.transports.tcp.port=2666") + .withFallback(system.settings.config)) override def atTermination() { other.shutdown() diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 6622b98d81..b071a353e6 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -18,7 +18,7 @@ object RemoteDeployerSpec { remote = "akka://sys@wallace:2552" } } - akka.remote.netty.port = 0 + akka.remoting.transports.tcp.port = 0 """, ConfigParseOptions.defaults) class RecipeActor extends Actor { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index ab7fb23356..e4611d0ba1 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -21,7 +21,7 @@ object RemoteRouterSpec { class RemoteRouterSpec extends AkkaSpec(""" akka { actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty { + remoting.transports.tcp { hostname = localhost port = 0 } @@ -29,7 +29,7 @@ akka { /blub { router = round-robin nr-of-instances = 2 - target.nodes = ["akka://remote-sys@localhost:12347"] + target.nodes = ["tcp.akka://remote-sys@localhost:12347"] } /elastic-blub { router = round-robin @@ -37,10 +37,10 @@ akka { lower-bound = 2 upper-bound = 3 } - target.nodes = ["akka://remote-sys@localhost:12347"] + target.nodes = ["tcp.akka://remote-sys@localhost:12347"] } /remote-blub { - remote = "akka://remote-sys@localhost:12347" + remote = "tcp.akka://remote-sys@localhost:12347" router = round-robin nr-of-instances = 2 } @@ -48,12 +48,12 @@ akka { remote = "akka://RemoteRouterSpec" router = round-robin nr-of-instances = 2 - target.nodes = ["akka://remote-sys@localhost:12347"] + target.nodes = ["tcp.akka://remote-sys@localhost:12347"] } /local-blub2 { router = round-robin nr-of-instances = 4 - target.nodes = ["akka://remote-sys@localhost:12347"] + target.nodes = ["tcp.akka://remote-sys@localhost:12347"] } } } @@ -61,7 +61,7 @@ akka { import RemoteRouterSpec._ - val conf = ConfigFactory.parseString("""akka.remote.netty.port=12347 + val conf = ConfigFactory.parseString("""akka.remoting.transports.tcp.port=12347 akka.actor.deployment { /remote-override { router = round-robin @@ -85,13 +85,13 @@ akka.actor.deployment { val children = replies.toSet children must have size 2 children.map(_.parent) must have size 1 - children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347") + children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347") system.stop(router) } "deploy its children on remote host driven by programatic definition" in { val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2), - Seq(Address("akka", "remote-sys", "localhost", 12347)))), "blub2") + Seq(Address("tcp.akka", "remote-sys", "localhost", 12347)))), "blub2") val replies = for (i ← 1 to 5) yield { router ! "" expectMsgType[ActorRef].path @@ -99,7 +99,7 @@ akka.actor.deployment { val children = replies.toSet children must have size 2 children.map(_.parent) must have size 1 - children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347") + children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347") system.stop(router) } @@ -112,13 +112,13 @@ akka.actor.deployment { val children = replies.toSet children.size must be >= 2 children.map(_.parent) must have size 1 - children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347") + children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347") system.stop(router) } "deploy remote routers based on configuration" in { val router = system.actorOf(Props[Echo].withRouter(FromConfig), "remote-blub") - router.path.address.toString must be("akka://remote-sys@localhost:12347") + router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347") val replies = for (i ← 1 to 5) yield { router ! "" expectMsgType[ActorRef].path @@ -128,14 +128,14 @@ akka.actor.deployment { val parents = children.map(_.parent) parents must have size 1 parents.head must be(router.path) - children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347") + children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347") system.stop(router) } "deploy remote routers based on explicit deployment" in { val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)) - .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote-sys@localhost:12347")))), "remote-blub2") - router.path.address.toString must be("akka://remote-sys@localhost:12347") + .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "remote-blub2") + router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347") val replies = for (i ← 1 to 5) yield { router ! "" expectMsgType[ActorRef].path @@ -145,13 +145,13 @@ akka.actor.deployment { val parents = children.map(_.parent) parents must have size 1 parents.head must be(router.path) - children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347") + children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347") system.stop(router) } "let remote deployment be overridden by local configuration" in { val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)) - .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote-sys@localhost:12347")))), "local-blub") + .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "local-blub") router.path.address.toString must be("akka://RemoteRouterSpec") val replies = for (i ← 1 to 5) yield { router ! "" @@ -161,15 +161,15 @@ akka.actor.deployment { children must have size 2 val parents = children.map(_.parent) parents must have size 1 - parents.head.address must be(Address("akka", "remote-sys", "localhost", 12347)) - children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347") + parents.head.address must be(Address("tcp.akka", "remote-sys", "localhost", 12347)) + children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347") system.stop(router) } "let remote deployment router be overridden by local configuration" in { val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)) - .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote-sys@localhost:12347")))), "local-blub2") - router.path.address.toString must be("akka://remote-sys@localhost:12347") + .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "local-blub2") + router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347") val replies = for (i ← 1 to 5) yield { router ! "" expectMsgType[ActorRef].path @@ -179,14 +179,14 @@ akka.actor.deployment { val parents = children.map(_.parent) parents must have size 1 parents.head must be(router.path) - children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347") + children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347") system.stop(router) } "let remote deployment be overridden by remote configuration" in { val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)) - .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote-sys@localhost:12347")))), "remote-override") - router.path.address.toString must be("akka://remote-sys@localhost:12347") + .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "remote-override") + router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347") val replies = for (i ← 1 to 5) yield { router ! "" expectMsgType[ActorRef].path @@ -196,7 +196,7 @@ akka.actor.deployment { val parents = children.map(_.parent) parents must have size 1 parents.head must be(router.path) - children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347") + children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347") system.stop(router) } @@ -206,7 +206,7 @@ akka.actor.deployment { } val router = system.actorOf(Props.empty.withRouter(new RemoteRouterConfig( RoundRobinRouter(1, supervisorStrategy = escalator), - Seq(Address("akka", "remote-sys", "localhost", 12347)))), "blub3") + Seq(Address("tcp.akka", "remote-sys", "localhost", 12347)))), "blub3") router ! CurrentRoutees EventFilter[ActorKilledException](occurrences = 1) intercept { diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index e5beca0122..efed787649 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -34,31 +34,6 @@ object RemotingSpec { } val cfg: Config = ConfigFactory parseString (""" - common-transport-settings { - log-transport-events = true - connection-timeout = 120s - use-dispatcher-for-io = "" - write-buffer-high-water-mark = 0b - write-buffer-low-water-mark = 0b - send-buffer-size = 32000b - receive-buffer-size = 32000b - backlog = 4096 - hostname = localhost - enable-ssl = false - - server-socket-worker-pool { - pool-size-min = 2 - pool-size-factor = 1.0 - pool-size-max = 8 - } - - client-socket-worker-pool { - pool-size-min = 2 - pool-size-factor = 1.0 - pool-size-max = 8 - } - } - common-ssl-settings { key-store = "%s" trust-store = "%s" @@ -76,44 +51,21 @@ object RemotingSpec { remoting.retry-latch-closed-for = 1 s remoting.log-remote-lifecycle-events = on + remoting.enabled-transports = [test, tcp, udp, ssl] - remoting.transports = [ - { + remoting.transports.tcp.port = 12345 + remoting.transports.udp.port = 12345 + remoting.transports.ssl.port = 23456 + remoting.transports.ssl.ssl = ${common-ssl-settings} + + remoting.transports.test { transport-class = "akka.remote.transport.TestTransport" - settings { - registry-key = aX33k0jWKg - local-address = "test://RemotingSpec@localhost:12345" - maximum-payload-bytes = 32000 bytes - scheme-identifier = test - } - }, - { - transport-class = "akka.remote.transport.netty.NettyTransport" - settings = ${common-transport-settings} - settings { - transport-protocol = tcp - port = 12345 - } - }, - { - transport-class = "akka.remote.transport.netty.NettyTransport" - settings = ${common-transport-settings} - settings { - transport-protocol = udp - port = 12345 - } - }, - { - transport-class = "akka.remote.transport.netty.NettyTransport" - settings = ${common-transport-settings} - settings { - transport-protocol = tcp - enable-ssl = true - port = 23456 - ssl = ${common-ssl-settings} - } - } - ] + applied-adapters = [] + registry-key = aX33k0jWKg + local-address = "test://RemotingSpec@localhost:12345" + maximum-payload-bytes = 32000 bytes + scheme-identifier = test + } actor.deployment { /blub.remote = "test.akka://remote-sys@localhost:12346" @@ -137,44 +89,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D val conf = ConfigFactory.parseString( """ - akka.remote.netty.port=12346 - akka.remoting.transports = [ - { - transport-class = "akka.remote.transport.TestTransport" - settings { - registry-key = aX33k0jWKg - local-address = "test://remote-sys@localhost:12346" - maximum-payload-bytes = 32000 bytes - scheme-identifier = test - } - }, - { - transport-class = "akka.remote.transport.netty.NettyTransport" - settings = ${common-transport-settings} - settings { - transport-protocol = tcp - port = 12346 - } - }, - { - transport-class = "akka.remote.transport.netty.NettyTransport" - settings = ${common-transport-settings} - settings { - transport-protocol = udp - port = 12346 - } - }, - { - transport-class = "akka.remote.transport.netty.NettyTransport" - settings = ${common-transport-settings} - settings { - transport-protocol = tcp - enable-ssl = true - port = 23457 - ssl = ${common-ssl-settings} - } - } - ] + akka.remoting.transports { + tcp.port = 12346 + udp.port = 12346 + ssl.port = 23457 + test.local-address = "test://remote-sys@localhost:12346" + } """).withFallback(system.settings.config).resolve() val other = ActorSystem("remote-sys", conf) @@ -195,12 +115,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "support remote look-ups" in { here ! "ping" - expectMsg("pong") - lastSender must be(testActor) + expectMsg(("pong", testActor)) } "send error message for wrong address" in { - EventFilter.error(start = "AssociationError", occurrences = 1).intercept { + EventFilter.error(start = "AssociationError").intercept { system.actorFor("test.akka://nonexistingsystem@localhost:12346/user/echo") ! "ping" } } diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index ba9918d120..7498cf269c 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -33,7 +33,7 @@ object Configuration { filter-leeway = 10s default-timeout = 10s } - + remote.transport = "akka.remote.netty.NettyRemoteTransport" remote.netty { hostname = localhost port = %d diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala index a1836c9d47..bb396df189 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala @@ -4,38 +4,36 @@ import akka.testkit._ import akka.actor._ import com.typesafe.config._ import scala.concurrent.duration._ -import akka.remote.netty.NettyRemoteTransport +import akka.remote.netty.{ SSLSettings, NettyRemoteTransport } import java.util.ArrayList @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket1978ConfigSpec extends AkkaSpec(""" -akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty { - hostname = localhost - port = 0 - } -} -""") with ImplicitSender with DefaultTimeout { +class Ticket1978ConfigSpec extends AkkaSpec with ImplicitSender with DefaultTimeout { + + val cfg = ConfigFactory.parseString(""" + ssl-settings { + key-store = "keystore" + trust-store = "truststore" + key-store-password = "changeme" + trust-store-password = "changeme" + protocol = "TLSv1" + random-number-generator = "AES128CounterSecureRNG" + enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA] + sha1prng-random-source = "/dev/./urandom" + }""") "SSL Remoting" must { "be able to parse these extra Netty config elements" in { - val settings = - system.asInstanceOf[ExtendedActorSystem] - .provider.asInstanceOf[RemoteActorRefProvider] - .transport.asInstanceOf[NettyRemoteTransport] - .settings - import settings._ + val settings = new SSLSettings(cfg.getConfig("ssl-settings")) - EnableSSL must be(false) - SslSettings.SSLKeyStore must be(Some("keystore")) - SslSettings.SSLKeyStorePassword must be(Some("changeme")) - SslSettings.SSLTrustStore must be(Some("truststore")) - SslSettings.SSLTrustStorePassword must be(Some("changeme")) - SslSettings.SSLProtocol must be(Some("TLSv1")) - SslSettings.SSLEnabledAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA")) - SslSettings.SSLRandomSource must be(None) - SslSettings.SSLRandomNumberGenerator must be(None) + settings.SSLKeyStore must be(Some("keystore")) + settings.SSLKeyStorePassword must be(Some("changeme")) + settings.SSLTrustStore must be(Some("truststore")) + settings.SSLTrustStorePassword must be(Some("changeme")) + settings.SSLProtocol must be(Some("TLSv1")) + settings.SSLEnabledAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA")) + settings.SSLRandomSource must be(Some("/dev/./urandom")) + settings.SSLRandomNumberGenerator must be(Some("AES128CounterSecureRNG")) } } } diff --git a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala index bddfd6d39b..821ae71d3c 100644 --- a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala @@ -25,16 +25,15 @@ import akka.actor.PoisonPill class UntrustedSpec extends AkkaSpec(""" akka.actor.provider = akka.remote.RemoteActorRefProvider akka.remote.untrusted-mode = on -akka.remote.netty.port = 0 -akka.remote.log-remote-lifecycle-events = off +akka.remoting.transports.tcp.port = 0 akka.loglevel = DEBUG """) with ImplicitSender { val other = ActorSystem("UntrustedSpec-client", ConfigFactory.parseString(""" akka.actor.provider = akka.remote.RemoteActorRefProvider - akka.remote.netty.port = 0 - """)) - val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress + akka.remoting.transports.tcp.port = 0 + """)) + val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head val target1 = other.actorFor(RootActorPath(addr) / "remote") val target2 = other.actorFor(RootActorPath(addr) / testActor.path.elements) diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 77fe3e8e93..133be9d546 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -390,7 +390,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re wrappedHandle.readHandlerPromise.success(testActor) - Thread.sleep(100) + Thread.sleep(100) //FIXME: Remove this reader ! Disassociated diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala new file mode 100644 index 0000000000..acd9939621 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -0,0 +1,90 @@ +package akka.remote.transport + +import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec } +import com.typesafe.config.{ Config, ConfigFactory } +import AkkaProtocolStressTest._ +import akka.actor._ +import scala.concurrent.duration._ + +object AkkaProtocolStressTest { + val configA: Config = ConfigFactory parseString (""" + akka { + #loglevel = DEBUG + actor.provider = "akka.remote.RemoteActorRefProvider" + + remoting.retry-latch-closed-for = 0 s + remoting.log-remote-lifecycle-events = on + + remoting.failure-detector { + threshold = 1.0 + max-sample-size = 2 + min-std-deviation = 1 ms + acceptable-heartbeat-pause = 0.01 s + } + remoting.retry-window = 1 s + remoting.maximum-retries-in-window = 1000 + + remoting.transports.tcp { + applied-adapters = ["gremlin"] + port = 12345 + } + + } + """) + + class SequenceVerifier(remote: ActorRef, controller: ActorRef) extends Actor { + val limit = 10000 + var nextSeq = 0 + var maxSeq = -1 + var losses = 0 + + def receive = { + case "start" ⇒ self ! "sendNext" + case "sendNext" ⇒ if (nextSeq < limit) { + remote ! nextSeq + nextSeq += 1 + self ! "sendNext" + } + case seq: Int ⇒ + if (seq > maxSeq) { + losses += seq - maxSeq - 1 + maxSeq = seq + if (seq > limit * 0.9) { + controller ! (maxSeq, losses) + } + } else { + controller ! "Received out of order message. Previous: ${maxSeq} Received: ${seq}" + } + } + } + +} + +class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout { + + val configB = ConfigFactory.parseString("akka.remoting.transports.tcp.port = 12346") + .withFallback(system.settings.config).resolve() + + val systemB = ActorSystem("systemB", configB) + val remote = systemB.actorOf(Props(new Actor { + def receive = { + case seq: Int ⇒ sender ! seq + } + }), "echo") + + val here = system.actorFor("tcp.gremlin.akka://systemB@localhost:12346/user/echo") + + "AkkaProtocolTransport" must { + "guarantee at-most-once delivery and message ordering despite packet loss" taggedAs TimingTest in { + val tester = system.actorOf(Props(new SequenceVerifier(here, self))) ! "start" + + expectMsgPF(30 seconds) { + case (received: Int, lost: Int) ⇒ + log.warning(s" ######## Received ${received - lost} messages from ${received} ########") + } + } + } + + override def atTermination(): Unit = systemB.shutdown() + +} diff --git a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala new file mode 100644 index 0000000000..6c22241a0d --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala @@ -0,0 +1,167 @@ +package akka.remote.transport + +import akka.actor.{ ExtendedActorSystem, Address } +import akka.remote.transport.AssociationHandle.Disassociated +import akka.remote.transport.AssociationHandle.InboundPayload +import akka.remote.transport.TestTransport._ +import akka.remote.transport.Transport.Fail +import akka.remote.transport.Transport.InboundAssociation +import akka.remote.transport.Transport.Ready +import akka.remote.transport.Transport.Status +import akka.testkit.{ ImplicitSender, DefaultTimeout, AkkaSpec } +import akka.util.ByteString +import scala.concurrent.{ Future, Await } +import akka.remote.RemoteActorRefProvider + +abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) + extends AkkaSpec("""akka.actor.provider = "akka.remote.RemoteActorRefProvider" """) + with DefaultTimeout with ImplicitSender { + + def transportName: String + def schemeIdentifier: String + + val addressATest: Address = Address("test", "testsytemA", "testhostA", 4321) + val addressBTest: Address = Address("test", "testsytemB", "testhostB", 5432) + + val addressA: Address = addressATest.copy(protocol = s"${addressATest.protocol}.$schemeIdentifier") + val addressB: Address = addressBTest.copy(protocol = s"${addressBTest.protocol}.$schemeIdentifier") + val nonExistingAddress = Address("test." + schemeIdentifier, "nosystem", "nohost", 0) + + def freshTransport(testTransport: TestTransport): Transport + def wrapTransport(transport: Transport): Transport = if (withAkkaProtocol) { + val provider = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] + new AkkaProtocolTransport(transport, system, new AkkaProtocolSettings(provider.remoteSettings.config), AkkaPduProtobufCodec) + } else transport + + def newTransportA(registry: AssociationRegistry): Transport = + wrapTransport(freshTransport(new TestTransport(addressATest, registry))) + def newTransportB(registry: AssociationRegistry): Transport = + wrapTransport(freshTransport(new TestTransport(addressBTest, registry))) + + transportName must { + + "return an Address and promise when listen is called" in { + val registry = new AssociationRegistry + val transportA = newTransportA(registry) + + val result = Await.result(transportA.listen, timeout.duration) + + result._1 must be(addressA) + result._2 must not be null + + registry.logSnapshot.exists { + case ListenAttempt(address) ⇒ address == addressATest + case _ ⇒ false + } must be(true) + } + + "associate successfully with another transport of its kind" in { + val registry = new AssociationRegistry + val transportA = newTransportA(registry) + val transportB = newTransportB(registry) + + // Must complete the returned promise to receive events + Await.result(transportA.listen, timeout.duration)._2.success(self) + Await.result(transportB.listen, timeout.duration)._2.success(self) + + awaitCond(registry.transportsReady(addressATest, addressBTest)) + + transportA.associate(addressB) + expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { + case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ + } + + registry.logSnapshot.contains(AssociateAttempt(addressATest, addressBTest)) must be(true) + awaitCond(registry.existsAssociation(addressATest, addressBTest)) + } + + "fail to associate with nonexisting address" in { + val registry = new AssociationRegistry + val transportA = newTransportA(registry) + + Await.result(transportA.listen, timeout.duration)._2.success(self) + awaitCond(registry.transportsReady(addressATest)) + + Await.result(transportA.associate(nonExistingAddress), timeout.duration) match { + case Fail(_) ⇒ + case _ ⇒ fail() + } + } + + "successfully send PDUs" in { + val registry = new AssociationRegistry + val transportA = newTransportA(registry) + val transportB = newTransportB(registry) + + Await.result(transportA.listen, timeout.duration)._2.success(self) + Await.result(transportB.listen, timeout.duration)._2.success(self) + + awaitCond(registry.transportsReady(addressATest, addressBTest)) + + val associate: Future[Status] = transportA.associate(addressB) + val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { + case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle + } + + val Ready(handleA) = Await.result(associate, timeout.duration) + + // Initialize handles + handleA.readHandlerPromise.success(self) + handleB.readHandlerPromise.success(self) + + val payload = ByteString("PDU") + val pdu = if (withAkkaProtocol) AkkaPduProtobufCodec.constructPayload(payload) else payload + + awaitCond(registry.existsAssociation(addressATest, addressBTest)) + + handleA.write(payload) + expectMsgPF(timeout.duration, "Expect InboundPayload from A") { + case InboundPayload(p) if payload == p ⇒ + } + + registry.logSnapshot.exists { + case WriteAttempt(sender, recipient, sentPdu) ⇒ + sender == addressATest && recipient == addressBTest && sentPdu == pdu + case _ ⇒ false + } must be(true) + } + + "successfully disassociate" in { + val registry = new AssociationRegistry + val transportA = newTransportA(registry) + val transportB = newTransportB(registry) + + Await.result(transportA.listen, timeout.duration)._2.success(self) + Await.result(transportB.listen, timeout.duration)._2.success(self) + + awaitCond(registry.transportsReady(addressATest, addressBTest)) + + val associate: Future[Status] = transportA.associate(addressB) + val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { + case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle + } + + val Ready(handleA) = Await.result(associate, timeout.duration) + + // Initialize handles + handleA.readHandlerPromise.success(self) + handleB.readHandlerPromise.success(self) + + awaitCond(registry.existsAssociation(addressATest, addressBTest)) + + handleA.disassociate() + + expectMsgPF(timeout.duration) { + case Disassociated ⇒ + } + + awaitCond(!registry.existsAssociation(addressATest, addressBTest)) + + registry.logSnapshot exists { + case DisassociateAttempt(requester, remote) if requester == addressATest && remote == addressBTest ⇒ true + case _ ⇒ false + } must be(true) + } + + } +} \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala index c4f51191c4..2c8eff1122 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala @@ -10,15 +10,15 @@ import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload } class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { - val addressA: Address = Address("akka", "testsytemA", "testhostA", 4321) - val addressB: Address = Address("akka", "testsytemB", "testhostB", 5432) - val nonExistingAddress = Address("akka", "nosystem", "nohost", 0) + val addressA: Address = Address("test", "testsytemA", "testhostA", 4321) + val addressB: Address = Address("test", "testsytemB", "testhostB", 5432) + val nonExistingAddress = Address("test", "nosystem", "nohost", 0) "TestTransport" must { "return an Address and promise when listen is called and log calls" in { val registry = new AssociationRegistry - var transportA = new TestTransport(addressA, registry) + val transportA = new TestTransport(addressA, registry) val result = Await.result(transportA.listen, timeout.duration) @@ -33,14 +33,14 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender "associate successfully with another TestTransport and log" in { val registry = new AssociationRegistry - var transportA = new TestTransport(addressA, registry) - var transportB = new TestTransport(addressB, registry) + val transportA = new TestTransport(addressA, registry) + val transportB = new TestTransport(addressB, registry) // Must complete the returned promise to receive events Await.result(transportA.listen, timeout.duration)._2.success(self) Await.result(transportB.listen, timeout.duration)._2.success(self) - awaitCond(registry.transportsReady(transportA, transportB)) + awaitCond(registry.transportsReady(addressA, addressB)) transportA.associate(addressB) expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { @@ -63,13 +63,13 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender "emulate sending PDUs and logs write" in { val registry = new AssociationRegistry - var transportA = new TestTransport(addressA, registry) - var transportB = new TestTransport(addressB, registry) + val transportA = new TestTransport(addressA, registry) + val transportB = new TestTransport(addressB, registry) Await.result(transportA.listen, timeout.duration)._2.success(self) Await.result(transportB.listen, timeout.duration)._2.success(self) - awaitCond(registry.transportsReady(transportA, transportB)) + awaitCond(registry.transportsReady(addressA, addressB)) val associate: Future[Status] = transportA.associate(addressB) val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { @@ -100,13 +100,13 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender "emulate disassociation and log it" in { val registry = new AssociationRegistry - var transportA = new TestTransport(addressA, registry) - var transportB = new TestTransport(addressB, registry) + val transportA = new TestTransport(addressA, registry) + val transportB = new TestTransport(addressB, registry) Await.result(transportA.listen, timeout.duration)._2.success(self) Await.result(transportB.listen, timeout.duration)._2.success(self) - awaitCond(registry.transportsReady(transportA, transportB)) + awaitCond(registry.transportsReady(addressA, addressB)) val associate: Future[Status] = transportA.associate(addressB) val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala new file mode 100644 index 0000000000..c5cc5a8a9f --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala @@ -0,0 +1,99 @@ +package akka.remote.transport + +import akka.testkit.AkkaSpec +import akka.remote.transport.ThrottlerTransportAdapter.{ TokenBucket, Unthrottled } +import java.util.concurrent.TimeUnit + +class ThrottleModeSpec extends AkkaSpec { + + "ThrottleMode" must { + + "allow consumption of infinite amount of tokens when untrhottled" in { + val bucket = Unthrottled + bucket.tryConsumeTokens(0, 100) must be((Unthrottled, true)) + bucket.tryConsumeTokens(100000, 1000) must be((Unthrottled, true)) + bucket.tryConsumeTokens(1000000, 10000) must be((Unthrottled, true)) + } + + "in tokenbucket mode allow consuming tokens up to capacity" in { + val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 100) + val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10) + bucket1 must be(TokenBucket(100, 100, 0, 90)) + success1 must be(true) + + val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = 0L, 40) + bucket2 must be(TokenBucket(100, 100, 0, 50)) + success2 must be(true) + + val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 0L, 50) + bucket3 must be(TokenBucket(100, 100, 0, 0)) + success3 must be(true) + + val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 0, 1) + bucket4 must be(TokenBucket(100, 100, 0, 0)) + success4 must be(false) + } + + "accurately replenish tokens" in { + val halfSecond: Long = TimeUnit.MILLISECONDS.toNanos(500) + val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 0) + val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 0) + bucket1 must be(TokenBucket(100, 100, 0, 0)) + success1 must be(true) + + val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 0) + bucket2 must be(TokenBucket(100, 100, halfSecond, 50)) + success2 must be(true) + + val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 0) + bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 100)) + success3 must be(true) + + val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 0) + bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 100)) + success4 must be(true) + } + + "accurately interleave replenish and consume" in { + val halfSecond: Long = TimeUnit.MILLISECONDS.toNanos(500) + val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20) + val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10) + bucket1 must be(TokenBucket(100, 100, 0, 10)) + success1 must be(true) + + val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 60) + bucket2 must be(TokenBucket(100, 100, halfSecond, 0)) + success2 must be(true) + + val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 40) + bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 10)) + success3 must be(true) + + val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 70) + bucket4 must be(TokenBucket(100, 100, 2 * halfSecond, 10)) + success4 must be(false) + } + + "allow oversized packets through by loaning" in { + val halfSecond: Long = TimeUnit.MILLISECONDS.toNanos(500) + val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20) + val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 30) + bucket1 must be(TokenBucket(100, 100, 0, 20)) + success1 must be(false) + + val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 110) + bucket2 must be(TokenBucket(100, 100, halfSecond, -40)) + success2 must be(true) + + val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 20) + bucket3 must be(TokenBucket(100, 100, halfSecond, -40)) + success3 must be(false) + + val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 20) + bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 40)) + success4 must be(true) + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala new file mode 100644 index 0000000000..4ab7b76d2a --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -0,0 +1,94 @@ +package akka.remote.transport + +import com.typesafe.config.{ ConfigFactory, Config } +import akka.actor._ +import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec } +import ThrottlerTransportAdapterSpec._ +import scala.concurrent.duration._ +import akka.remote.transport.TestTransport.{ WriteAttempt, AssociationRegistry } +import scala.concurrent.{ Promise, Future, Await } +import akka.remote.transport.Transport.{ Ready, InboundAssociation, Status } +import akka.util.ByteString +import akka.remote.transport.AssociationHandle.InboundPayload +import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, TokenBucket, SetThrottle } +import akka.remote.RemoteActorRefProvider + +object ThrottlerTransportAdapterSpec { + val configA: Config = ConfigFactory parseString (""" + akka { + #loglevel = DEBUG + actor.provider = "akka.remote.RemoteActorRefProvider" + + remoting.retry-latch-closed-for = 0 s + remoting.log-remote-lifecycle-events = on + + remoting.transports.tcp.applied-adapters = ["trttl"] + remoting.transports.tcp.port = 12345 + } + """) + + class Echo extends Actor { + override def receive = { + case "ping" ⇒ sender ! "pong" + } + } + + val PingPacketSize = 148 + val MessageCount = 100 + val BytesPerSecond = 500 + val TotalTime = (MessageCount * PingPacketSize) / BytesPerSecond + + class ThrottlingTester(remote: ActorRef, controller: ActorRef) extends Actor { + var messageCount = MessageCount + var received = 0 + var startTime = 0L + + override def receive = { + case "start" ⇒ + self ! "sendNext" + startTime = System.nanoTime() + case "sendNext" ⇒ if (messageCount > 0) { + remote ! "ping" + self ! "sendNext" + messageCount -= 1 + } + case "pong" ⇒ + received += 1 + if (received >= MessageCount) controller ! (System.nanoTime() - startTime) + } + } +} + +class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout { + val configB = ConfigFactory.parseString("akka.remoting.transports.tcp.port = 12346") + .withFallback(system.settings.config).resolve() + + val systemB = ActorSystem("systemB", configB) + val remote = systemB.actorOf(Props[Echo], "echo") + + val here = system.actorFor("tcp.trttl.akka://systemB@localhost:12346/user/echo") + + "ThrottlerTransportAdapter" must { + "maintain average message rate" taggedAs TimingTest in { + Await.result( + system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport + .managementCommand(SetThrottle(Address("akka", "systemB", "localhost", 12346), Direction.Send, TokenBucket(200, 500, 0, 0))), 3 seconds) + val tester = system.actorOf(Props(new ThrottlingTester(here, self))) ! "start" + + expectMsgPF((TotalTime + 3) seconds) { + case time: Long ⇒ log.warning("Total time of transmission: " + NANOSECONDS.toSeconds(time)) + } + } + } + + override def atTermination(): Unit = systemB.shutdown() +} + +class ThrottlerTransportAdapterGenericSpec extends GenericTransportSpec(withAkkaProtocol = true) { + + def transportName = "ThrottlerTransportAdapter" + def schemeIdentifier = "trttl.akka" + def freshTransport(testTransport: TestTransport) = + new ThrottlerTransportAdapter(testTransport, system.asInstanceOf[ExtendedActorSystem]) + +}