Streamlined configuration, transport adapters and FailureInjector

- Transports no longer uses raw ActorRefs as listeners but proper interfaces.
 - Added managementCommand support to Transports
 - Added support for dynamically loadable transport adapters
 - Added throttler/failure injector transport adapter
 - added actor based adapter support
 - Changed configuration method of multiple transports - Fixed tests to work with the new remoting
This commit is contained in:
Endre Sándor Varga 2012-11-23 10:15:19 +01:00
parent 5b96c28acd
commit 0705d47a88
30 changed files with 1812 additions and 585 deletions

View file

@ -80,7 +80,7 @@ akka {
wait-activity-enabled = on
# FIXME document
backoff-interval = 1 s
backoff-interval = 0.01 s
# FIXME document
secure-cookie = ""
@ -105,13 +105,62 @@ akka {
# FIXME document
use-passive-connections = on
adapters {
gremlin = "akka.remote.transport.FailureInjectorProvider"
trttl = "akka.remote.transport.ThrottlerProvider"
}
enabled-transports = ["tcp"]
transports.tcp {
transport-class = "akka.remote.transport.netty.NettyTransport"
applied-adapters = []
transport-protocol = tcp
port = 2552
hostname = "localhost" #FIXME Empty string should default to localhost
enable-ssl = false
log-transport-events = true
connection-timeout = 120s
use-dispatcher-for-io = ""
write-buffer-high-water-mark = 0b
write-buffer-low-water-mark = 0b
send-buffer-size = 32000b
receive-buffer-size = 32000b
backlog = 4096
server-socket-worker-pool {
pool-size-min = 2
pool-size-factor = 1.0
pool-size-max = 8
}
client-socket-worker-pool {
pool-size-min = 2
pool-size-factor = 1.0
pool-size-max = 8
}
}
transports.udp = ${akka.remoting.transports.tcp}
transports.udp {
transport-protocol = udp
}
transports.ssl = ${akka.remoting.transports.tcp}
transports.ssl = {
enable-ssl = true
}
}
remote {
# Which implementation of akka.remote.RemoteTransport to use
# default is a TCP-based remote transport based on Netty
transport = "akka.remote.netty.NettyRemoteTransport"
transport = "akka.remote.Remoting"
# Enable untrusted mode for full security of server managed actors, prevents
# system messages to be send by clients, e.g. messages like 'Create',

View file

@ -46,6 +46,7 @@ class DefaultMessageDispatcher(private val system: ExtendedActorSystem,
recipient match {
case `remoteDaemon`
if (UntrustedMode) log.debug("dropping daemon message in untrusted mode") else {
if (LogReceive) log.debug("received daemon message {}", msgLog)
payload match {
case m @ (_: DaemonMsg | _: Terminated)
@ -54,6 +55,7 @@ class DefaultMessageDispatcher(private val system: ExtendedActorSystem,
}
case x log.debug("remoteDaemon received illegal message {} from {}", x, sender)
}
}
case l @ (_: LocalRef | _: RepointableRef) if l.isLocal
if (LogReceive) log.debug("received local message {}", msgLog)

View file

@ -134,7 +134,8 @@ class PhiAccrualFailureDetector(
/**
* Cumulative distribution function for N(mean, stdDeviation) normal distribution.
* This is an approximation defined in β Mathematics Handbook.
* This is an approximation defined in β Mathematics Handbook (Logistic approximation).
* Error is 0.00014 at +- 3.16
*/
private[akka] def cumulativeDistributionFunction(x: Double, mean: Double, stdDeviation: Double): Double = {
val y = (x - mean) / stdDeviation

View file

@ -164,9 +164,15 @@ class RemoteActorRefProvider(
if (isSelfAddress(addr)) {
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
} else {
try {
val localAddress = transport.localAddressForRemote(addr)
val rpath = RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements
new RemoteActorRef(this, transport, localAddress, rpath, supervisor, Some(props), Some(d))
} catch {
case NonFatal(e)
log.error(e, "Error while looking up address {}", addr)
new EmptyLocalActorRef(this, path, eventStream)
}
}
case _ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
@ -174,10 +180,17 @@ class RemoteActorRefProvider(
}
}
def actorFor(path: ActorPath): InternalActorRef =
def actorFor(path: ActorPath): InternalActorRef = {
if (isSelfAddress(path.address)) actorFor(rootGuardian, path.elements)
else new RemoteActorRef(this, transport, transport.localAddressForRemote(path.address),
else try {
new RemoteActorRef(this, transport, transport.localAddressForRemote(path.address),
path, Nobody, props = None, deploy = None)
} catch {
case NonFatal(e)
log.error(e, "Error while looking up address {}", path.address)
new EmptyLocalActorRef(this, path, eventStream)
}
}
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case ActorPathExtractor(address, elems)

View file

@ -12,6 +12,7 @@ import akka.serialization.Serialization
import akka.remote.RemoteProtocol._
import akka.actor._
import scala.collection.immutable
import scala.concurrent.Future
/**
* Remote life-cycle events.
@ -220,6 +221,14 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
if (logRemoteLifeCycleEvents) log.log(message.logLevel, "{}", message)
}
/**
* Sends a management command to the underlying transport stack. The call returns with a Future that indicates
* if the command was handled successfully or dropped.
* @param cmd Command message to send to the transports.
* @return A Future that indicates when the message was successfully handled or dropped.
*/
def managementCommand(cmd: Any): Future[Boolean] = { Future.successful(false) }
/**
* A Logger that can be used to log issues that may occur
*/

View file

@ -1,14 +1,15 @@
package akka.remote
import scala.language.postfixOps
import akka.actor.SupervisorStrategy._
import akka.actor._
import akka.event.{ Logging, LoggingAdapter }
import akka.pattern.gracefulStop
import akka.remote.EndpointManager.{ StartupFinished, Listen, Send }
import akka.remote.transport.Transport.InboundAssociation
import akka.remote.EndpointManager.{ StartupFinished, ManagementCommand, Listen, Send }
import akka.remote.transport.Transport.{ AssociationEventListener, InboundAssociation }
import akka.remote.transport._
import akka.util.Timeout
import com.typesafe.config.Config
import com.typesafe.config.{ ConfigFactory, Config }
import scala.collection.immutable.{ Seq, HashMap }
import scala.concurrent.duration._
import scala.concurrent.{ Promise, Await, Future }
@ -18,8 +19,9 @@ import java.util.concurrent.TimeoutException
import scala.util.{ Failure, Success }
import scala.collection.immutable
import akka.japi.Util.immutableSeq
import akka.remote.Remoting.RegisterTransportActor
class RemotingSettings(config: Config) {
class RemotingSettings(val config: Config) {
import config._
import scala.collection.JavaConverters._
@ -30,7 +32,7 @@ class RemotingSettings(config: Config) {
val StartupTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS)
val RetryGateClosedFor: Long = getMilliseconds("akka.remoting.retry-gate-closed-for")
val RetryGateClosedFor: Long = getNanoseconds("akka.remoting.retry-gate-closed-for")
val UsePassiveConnections: Boolean = getBoolean("akka.remoting.use-passive-connections")
@ -41,10 +43,21 @@ class RemotingSettings(config: Config) {
val BackoffPeriod: FiniteDuration =
Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS)
val Transports: immutable.Seq[(String, Config)] =
immutableSeq(config.getConfigList("akka.remoting.transports")).map {
conf (conf.getString("transport-class"), conf.getConfig("settings"))
val Transports: Seq[(String, Seq[String], Config)] = transportNames.map { name
val transportConfig = transportConfigFor(name)
(transportConfig.getString("transport-class"),
immutableSeq(transportConfig.getStringList("applied-adapters")),
transportConfig)
}
val Adapters: Map[String, String] = configToMap(getConfig("akka.remoting.adapters"))
private def transportNames: Seq[String] = immutableSeq(getStringList("akka.remoting.enabled-transports"))
private def transportConfigFor(transportName: String): Config = getConfig("akka.remoting.transports." + transportName)
private def configToMap(cfg: Config): Map[String, String] =
cfg.root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
}
private[remote] object Remoting {
@ -60,7 +73,7 @@ private[remote] object Remoting {
responsibleTransports.size match {
case 0
throw new RemoteTransportException(
s"No transport is responsible for address: [${remote}] although protocol [${remote.protocol}] is available." +
s"No transport is responsible for address: ${remote} although protocol ${remote.protocol} is available." +
" Make sure at least one transport is configured to be responsible for the address.",
null)
@ -74,10 +87,13 @@ private[remote] object Remoting {
"so that only one transport is responsible for the address.",
null)
}
case None throw new RemoteTransportException(s"No transport is loaded for protocol: ${remote.protocol}", null)
case None throw new RemoteTransportException(
s"No transport is loaded for protocol: ${remote.protocol}, available protocols: ${transportMapping.keys.mkString}", null)
}
}
case class RegisterTransportActor(props: Props, name: String)
}
private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
@ -90,6 +106,16 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
private val settings = new RemotingSettings(provider.remoteSettings.config)
val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor {
override def supervisorStrategy = OneForOneStrategy() {
case NonFatal(e) Restart
}
def receive = {
case RegisterTransportActor(props, name) sender ! context.actorOf(props, name)
}
}), "transports")
override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote)
val log: LoggingAdapter = Logging(system.eventStream, "Remoting")
@ -164,6 +190,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
endpointManager.tell(Send(message, senderOption, recipient), sender = Actor.noSender)
}
override def managementCommand(cmd: Any): Future[Boolean] = {
val statusPromise = Promise[Boolean]()
endpointManager.tell(ManagementCommand(cmd, statusPromise), sender = Actor.noSender)
statusPromise.future
}
// Not used anywhere only to keep compatibility with RemoteTransport interface
protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode
@ -182,6 +214,8 @@ private[remote] object EndpointManager {
override def toString = s"Remote message $senderOption -> $recipient"
}
case class ManagementCommand(cmd: Any, statusPromise: Promise[Boolean]) extends RemotingCommand
sealed trait EndpointPolicy
case class Pass(endpoint: ActorRef) extends EndpointPolicy
case class Gated(timeOfFailure: Long) extends EndpointPolicy
@ -299,10 +333,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
context.stop(self)
}
case ManagementCommand(_, statusPromise) statusPromise.success(false)
case StartupFinished context.become(accepting)
}
val accepting: Receive = {
case ManagementCommand(cmd, statusPromise)
transportMapping.values foreach { _.managementCommand(cmd, statusPromise) }
case s @ Send(message, senderOption, recipientRef)
val recipientAddress = recipientRef.path.address
@ -337,11 +376,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}
private def initializeTransports(addressesPromise: Promise[Set[(Transport, Address)]]): Unit = {
val transports = for ((fqn, config) settings.Transports) yield {
val transports = for ((fqn, adapters, config) settings.Transports) yield {
val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config)
val wrappedTransport = extendedSystem.dynamicAccess
val driver = extendedSystem.dynamicAccess
.createInstanceFor[Transport](fqn, args).recover({
case exception throw new IllegalArgumentException(
@ -351,11 +390,17 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}).get
val wrappedTransport =
adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider(_) }.foldLeft(driver) {
(t: Transport, provider: TransportAdapterProvider)
provider(t, context.system.asInstanceOf[ExtendedActorSystem])
}
new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec)
}
val listens: Future[Seq[(Transport, (Address, Promise[ActorRef]))]] = Future.sequence(
val listens: Future[Seq[(Transport, (Address, Promise[AssociationEventListener]))]] = Future.sequence(
transports.map { transport transport.listen map (transport -> _) })
listens.onComplete {
@ -394,7 +439,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
.withDispatcher("akka.remoting.writer-dispatcher"),
"endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next())
context.watch(endpoint) // TODO: see what to do with this
context.watch(endpoint)
}

View file

@ -0,0 +1,151 @@
package akka.remote.transport
import scala.language.postfixOps
import akka.actor._
import akka.pattern.ask
import akka.remote.transport.Transport._
import akka.remote.{ RemotingSettings, RemoteActorRefProvider }
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Promise, Future }
import scala.util.Success
import scala.util.Failure
import akka.remote.Remoting.RegisterTransportActor
import akka.util.Timeout
import scala.concurrent.duration._
trait TransportAdapterProvider extends ((Transport, ExtendedActorSystem) Transport)
class TransportAdapters(system: ExtendedActorSystem) extends Extension {
val settings = new RemotingSettings(system.provider.asInstanceOf[RemoteActorRefProvider].remoteSettings.config)
private val adaptersTable: Map[String, TransportAdapterProvider] = for ((name, fqn) settings.Adapters) yield {
name -> system.dynamicAccess.createInstanceFor[TransportAdapterProvider](fqn, immutable.Seq.empty).recover({
case exception throw new IllegalArgumentException("Cannot instantiate transport adapter" + fqn, exception)
}).get
}
def getAdapterProvider(name: String): TransportAdapterProvider = adaptersTable.get(name) match {
case Some(provider) provider
case None throw new IllegalArgumentException("There is no registered transport adapter provider with name: " + name)
}
}
object TransportAdaptersExtension extends ExtensionId[TransportAdapters] with ExtensionIdProvider {
override def get(system: ActorSystem): TransportAdapters = super.get(system)
override def lookup = TransportAdaptersExtension
override def createExtension(system: ExtendedActorSystem): TransportAdapters =
new TransportAdapters(system)
}
trait SchemeAugmenter {
protected def addedSchemeIdentifier: String
protected def augmentScheme(originalScheme: String): String = s"$originalScheme.$addedSchemeIdentifier"
protected def augmentScheme(address: Address): Address = address.copy(protocol = augmentScheme(address.protocol))
protected def removeScheme(scheme: String): String = if (scheme.endsWith(s".$addedSchemeIdentifier"))
scheme.take(scheme.length - addedSchemeIdentifier.length - 1)
else scheme
protected def removeScheme(address: Address): Address = address.copy(protocol = removeScheme(address.protocol))
}
/**
* An adapter that wraps a transport and provides interception
*/
abstract class AbstractTransportAdapter(protected val wrappedTransport: Transport, implicit val ec: ExecutionContext)
extends Transport with SchemeAugmenter {
protected def maximumOverhead: Int
protected def interceptListen(listenAddress: Address,
listenerFuture: Future[AssociationEventListener]): AssociationEventListener
protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit
override def schemeIdentifier: String = augmentScheme(wrappedTransport.schemeIdentifier)
override def isResponsibleFor(address: Address): Boolean = wrappedTransport.isResponsibleFor(address)
override def maximumPayloadBytes: Int = wrappedTransport.maximumPayloadBytes - maximumOverhead
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise()
val upstreamListenerPromise: Promise[AssociationEventListener] = Promise()
wrappedTransport.listen.onComplete {
case Success((listenAddress, listenerPromise))
// Register to downstream
listenerPromise.success(interceptListen(listenAddress, upstreamListenerPromise.future))
// Notify upstream
listenPromise.success((augmentScheme(listenAddress), upstreamListenerPromise))
case Failure(reason) listenPromise.failure(reason)
}
listenPromise.future
}
override def associate(remoteAddress: Address): Future[Status] = {
// Prepare a future, and pass its promise to the manager
val statusPromise: Promise[Status] = Promise()
interceptAssociate(removeScheme(remoteAddress), statusPromise)
statusPromise.future
}
override def shutdown(): Unit = wrappedTransport.shutdown()
}
abstract class AbstractTransportAdapterHandle(val originalLocalAddress: Address,
val originalRemoteAddress: Address,
val wrappedHandle: AssociationHandle,
val addedSchemeIdentifier: String) extends AssociationHandle
with SchemeAugmenter {
def this(wrappedHandle: AssociationHandle, addedSchemeIdentifier: String) =
this(wrappedHandle.localAddress,
wrappedHandle.remoteAddress,
wrappedHandle,
addedSchemeIdentifier)
override val localAddress = augmentScheme(originalLocalAddress)
override val remoteAddress = augmentScheme(originalRemoteAddress)
}
object ActorTransportAdapter {
sealed trait TransportOperation
case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation
case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[Status]) extends TransportOperation
case class ListenUnderlying(listenAddress: Address,
upstreamListener: Future[AssociationEventListener]) extends TransportOperation
case object DisassociateUnderlying extends TransportOperation
}
abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem)
extends AbstractTransportAdapter(wrappedTransport, system.dispatcher) {
import ActorTransportAdapter._
private implicit val timeout = new Timeout(3 seconds)
protected def managerName: String
protected def managerProps: Props
// The blocking call below is only called during the startup sequence.
protected val manager = Await.result(registerManager(), 3 seconds)
private def registerManager(): Future[ActorRef] =
(system.actorFor("/system/transports") ? RegisterTransportActor(managerProps, managerName)).mapTo[ActorRef]
protected def interceptListen(listenAddress: Address,
listenerPromise: Future[AssociationEventListener]): AssociationEventListener = {
manager ! ListenUnderlying(listenAddress, listenerPromise)
manager
}
override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit =
manager ! AssociateUnderlying(remoteAddress, statusPromise)
override def shutdown(): Unit = manager ! PoisonPill
}

View file

@ -18,6 +18,7 @@ import scala.util.control.NonFatal
import scala.util.{ Success, Failure }
import java.net.URLEncoder
import scala.collection.immutable.Queue
import akka.remote.transport.ActorTransportAdapter._
class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause)
@ -50,21 +51,6 @@ private[remote] object AkkaProtocolTransport {
val AkkaOverhead: Int = 0 //Don't know yet
val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)
sealed trait TransportOperation
case class HandlerRegistered(handler: ActorRef) extends TransportOperation
case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[Status]) extends TransportOperation
case class ListenUnderlying(listenPromise: Promise[(Address, Promise[ActorRef])]) extends TransportOperation
case object DisassociateUnderlying extends TransportOperation
def augmentScheme(originalScheme: String): String = s"$originalScheme.$AkkaScheme"
def augmentScheme(address: Address): Address = address.copy(protocol = augmentScheme(address.protocol))
def removeScheme(scheme: String): String = if (scheme.endsWith(s".$AkkaScheme"))
scheme.take(scheme.length - AkkaScheme.length - 1)
else scheme
def removeScheme(address: Address): Address = address.copy(protocol = removeScheme(address.protocol))
}
/**
@ -92,42 +78,19 @@ private[remote] object AkkaProtocolTransport {
* the codec that will be used to encode/decode Akka PDUs
*/
private[remote] class AkkaProtocolTransport(
private val wrappedTransport: Transport,
wrappedTransport: Transport,
private val system: ActorSystem,
private val settings: AkkaProtocolSettings,
private val codec: AkkaPduCodec) extends Transport {
private val codec: AkkaPduCodec) extends ActorTransportAdapter(wrappedTransport, system) {
override val schemeIdentifier: String = augmentScheme(wrappedTransport.schemeIdentifier)
override val addedSchemeIdentifier: String = AkkaScheme
override def isResponsibleFor(address: Address): Boolean = wrappedTransport.isResponsibleFor(removeScheme(address))
//TODO: make this the child of someone more appropriate
private val manager = system.asInstanceOf[ActorSystemImpl].systemActorOf(
Props(new AkkaProtocolManager(wrappedTransport, settings)),
s"akkaprotocolmanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}")
override val maximumPayloadBytes: Int = wrappedTransport.maximumPayloadBytes - AkkaProtocolTransport.AkkaOverhead
override def listen: Future[(Address, Promise[ActorRef])] = {
// Prepare a future, and pass its promise to the manager
val listenPromise: Promise[(Address, Promise[ActorRef])] = Promise()
manager ! ListenUnderlying(listenPromise)
listenPromise.future
}
override def associate(remoteAddress: akka.actor.Address): Future[Status] = {
// Prepare a future, and pass its promise to the manager
val statusPromise: Promise[Status] = Promise()
manager ! AssociateUnderlying(remoteAddress, statusPromise)
statusPromise.future
}
override def shutdown(): Unit = manager ! PoisonPill
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit =
wrappedTransport.managementCommand(cmd, statusPromise)
override val maximumOverhead: Int = AkkaProtocolTransport.AkkaOverhead
protected def managerName = s"akkaprotocolmanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}"
protected def managerProps = Props(new AkkaProtocolManager(wrappedTransport, settings))
}
private[transport] class AkkaProtocolManager(
@ -145,32 +108,17 @@ private[transport] class AkkaProtocolManager(
private val nextId = Iterator from 0
private val associationHandlerPromise: Promise[ActorRef] = Promise()
associationHandlerPromise.future.map { HandlerRegistered(_) } pipeTo self
var localAddress: Address = _
@volatile var localAddress: Address = _
private var associationHandler: ActorRef = _
private var associationHandler: AssociationEventListener = _
def receive: Receive = {
case ListenUnderlying(listenPromise)
val listenFuture = wrappedTransport.listen
case ListenUnderlying(listenAddress, upstreamListenerFuture)
localAddress = listenAddress
upstreamListenerFuture.future.map { ListenerRegistered(_) } pipeTo self
// - Receive the address and promise from original transport
// - then register ourselves as listeners
// - then complete the exposed promise with the modified contents
listenFuture.onComplete {
case Success((address, wrappedTransportHandlerPromise))
// Register ourselves as the handler for the wrapped transport's listen call
wrappedTransportHandlerPromise.success(self)
localAddress = address
// Pipe the result to the original caller
listenPromise.success((augmentScheme(address), associationHandlerPromise))
case Failure(reason) listenPromise.failure(reason)
}
case HandlerRegistered(handler)
associationHandler = handler
case ListenerRegistered(listener)
associationHandler = listener
context.become(ready)
// Block inbound associations until handler is registered
@ -215,13 +163,13 @@ private[transport] class AkkaProtocolManager(
}
private[transport] class AkkaProtocolHandle(
val localAddress: Address,
val remoteAddress: Address,
val readHandlerPromise: Promise[ActorRef],
private val wrappedHandle: AssociationHandle,
_localAddress: Address,
_remoteAddress: Address,
val readHandlerPromise: Promise[HandleEventListener],
_wrappedHandle: AssociationHandle,
private val stateActor: ActorRef,
private val codec: AkkaPduCodec)
extends AssociationHandle {
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, AkkaScheme) {
override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))
@ -256,6 +204,8 @@ private[transport] object ProtocolStateActor {
case object HeartbeatTimer
case class HandleListenerRegistered(listener: HandleEventListener)
sealed trait ProtocolStateData
trait InitialProtocolStateData extends ProtocolStateData
@ -268,14 +218,15 @@ private[transport] object ProtocolStateActor {
extends ProtocolStateData
// The underlying transport is associated, but the handshake of the akka protocol is not yet finished
case class InboundUnassociated(associationHandler: ActorRef, wrappedHandle: AssociationHandle)
case class InboundUnassociated(associationListener: AssociationEventListener, wrappedHandle: AssociationHandle)
extends InitialProtocolStateData
// Both transports are associated, but the handler for the handle has not yet been provided
case class AssociatedWaitHandler(handlerFuture: Future[ActorRef], wrappedHandle: AssociationHandle, queue: Queue[ByteString])
case class AssociatedWaitHandler(handleListener: Future[HandleEventListener], wrappedHandle: AssociationHandle,
queue: Queue[ByteString])
extends ProtocolStateData
case class HandlerReady(handler: ActorRef, wrappedHandle: AssociationHandle)
case class ListenerReady(listener: HandleEventListener, wrappedHandle: AssociationHandle)
extends ProtocolStateData
case object TimeoutReason
@ -305,16 +256,16 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
// Inbound case
def this(localAddress: Address,
wrappedHandle: AssociationHandle,
associationHandler: ActorRef,
associationListener: AssociationEventListener,
settings: AkkaProtocolSettings,
codec: AkkaPduCodec,
failureDetector: FailureDetector) = {
this(InboundUnassociated(associationHandler, wrappedHandle), localAddress, settings, codec, failureDetector)
this(InboundUnassociated(associationListener, wrappedHandle), localAddress, settings, codec, failureDetector)
}
initialData match {
case d: OutboundUnassociated
d.transport.associate(removeScheme(d.remoteAddress)) pipeTo self
d.transport.associate(d.remoteAddress) pipeTo self
startWith(Closed, d)
case d: InboundUnassociated
@ -421,8 +372,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)
// Queue message until handler is registered
stay() using AssociatedWaitHandler(handlerFuture, wrappedHandle, queue :+ payload)
case HandlerReady(handler, _)
handler ! InboundPayload(payload)
case ListenerReady(listener, _)
listener notify InboundPayload(payload)
stay()
}
@ -430,15 +381,19 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
}
case Event(HeartbeatTimer, AssociatedWaitHandler(_, wrappedHandle, _)) handleTimers(wrappedHandle)
case Event(HeartbeatTimer, HandlerReady(_, wrappedHandle)) handleTimers(wrappedHandle)
case Event(HeartbeatTimer, ListenerReady(_, wrappedHandle)) handleTimers(wrappedHandle)
case Event(DisassociateUnderlying, HandlerReady(handler, wrappedHandle))
sendDisassociate(wrappedHandle)
case Event(DisassociateUnderlying, _)
val handle = stateData match {
case ListenerReady(_, wrappedHandle) wrappedHandle
case AssociatedWaitHandler(_, wrappedHandle, _) wrappedHandle
}
sendDisassociate(handle)
stop()
case Event(HandlerRegistered(ref), AssociatedWaitHandler(_, wrappedHandle, queue))
queue.foreach { ref ! InboundPayload(_) }
stay() using HandlerReady(ref, wrappedHandle)
case Event(HandleListenerRegistered(listener), AssociatedWaitHandler(_, wrappedHandle, queue))
queue.foreach { listener notify InboundPayload(_) }
stay() using ListenerReady(listener, wrappedHandle)
}
private def initTimers(): Unit = {
@ -477,25 +432,26 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
// Invalidate exposed but still unfinished promise. The underlying association disappeared, so after
// registration immediately signal a disassociate
handlerFuture.onSuccess {
case handler: ActorRef handler ! Disassociated
case listener: HandleEventListener listener notify Disassociated
}
case StopEvent(_, _, HandlerReady(handler, wrappedHandle))
handler ! Disassociated
case StopEvent(_, _, ListenerReady(handler, wrappedHandle))
handler notify Disassociated
wrappedHandle.disassociate()
case StopEvent(_, _, InboundUnassociated(_, wrappedHandle))
wrappedHandle.disassociate()
}
private def notifyOutboundHandler(wrappedHandle: AssociationHandle, statusPromise: Promise[Status]): Future[ActorRef] = {
val readHandlerPromise: Promise[ActorRef] = Promise()
readHandlerPromise.future.map { HandlerRegistered(_) } pipeTo self
private def notifyOutboundHandler(wrappedHandle: AssociationHandle,
statusPromise: Promise[Status]): Future[HandleEventListener] = {
val readHandlerPromise: Promise[HandleEventListener] = Promise()
readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self
val exposedHandle =
new AkkaProtocolHandle(
augmentScheme(localAddress),
augmentScheme(wrappedHandle.remoteAddress),
localAddress,
wrappedHandle.remoteAddress,
readHandlerPromise,
wrappedHandle,
self,
@ -505,20 +461,22 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
readHandlerPromise.future
}
private def notifyInboundHandler(wrappedHandle: AssociationHandle, originAddress: Address, associationHandler: ActorRef): Future[ActorRef] = {
val readHandlerPromise: Promise[ActorRef] = Promise()
readHandlerPromise.future.map { HandlerRegistered(_) } pipeTo self
private def notifyInboundHandler(wrappedHandle: AssociationHandle,
originAddress: Address,
associationListener: AssociationEventListener): Future[HandleEventListener] = {
val readHandlerPromise: Promise[HandleEventListener] = Promise()
readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self
val exposedHandle =
new AkkaProtocolHandle(
augmentScheme(localAddress),
augmentScheme(originAddress),
localAddress,
originAddress,
readHandlerPromise,
wrappedHandle,
self,
codec)
associationHandler ! InboundAssociation(exposedHandle)
associationListener notify InboundAssociation(exposedHandle)
readHandlerPromise.future
}

View file

@ -0,0 +1,138 @@
package akka.remote.transport
import FailureInjectorTransportAdapter._
import akka.AkkaException
import akka.actor.{ Address, ExtendedActorSystem }
import akka.event.Logging
import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener }
import akka.remote.transport.Transport._
import akka.util.ByteString
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.{ Future, Promise }
case class FailureInjectorException(msg: String) extends AkkaException(msg)
class FailureInjectorProvider extends TransportAdapterProvider {
def apply(wrappedTransport: Transport, system: ExtendedActorSystem): Transport =
new FailureInjectorTransportAdapter(wrappedTransport, system)
}
private[remote] object FailureInjectorTransportAdapter {
val FailureInjectorSchemeIdentifier = "gremlin"
trait FailureInjectorCommand
case class All(mode: GremlinMode)
case class One(remoteAddress: Address, mode: GremlinMode)
sealed trait GremlinMode
case object PassThru extends GremlinMode
case class Drop(outboundDropP: Double, inboundDropP: Double) extends GremlinMode
}
private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem)
extends AbstractTransportAdapter(wrappedTransport, extendedSystem.dispatcher) with AssociationEventListener {
import extendedSystem.dispatcher
private val rng = ThreadLocalRandom.current()
private val log = Logging(extendedSystem, "FailureInjector (gremlin)")
@volatile private var upstreamListener: Option[AssociationEventListener] = None
private[transport] val addressChaosTable = new ConcurrentHashMap[Address, GremlinMode]()
@volatile private var allMode: GremlinMode = PassThru
override val addedSchemeIdentifier = FailureInjectorSchemeIdentifier
protected def maximumOverhead = 0
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match {
case All(mode)
allMode = mode
statusPromise.success(true)
case One(address, mode)
// don't care about the protocol part - we are injected in the stack anyway!
addressChaosTable.put(address.copy(protocol = "", system = ""), mode)
statusPromise.success(true)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
}
protected def interceptListen(listenAddress: Address,
listenerFuture: Future[AssociationEventListener]): AssociationEventListener = {
log.warning("FailureInjectorTransport is active on this system. Gremlins might munch your packets.")
listenerFuture.onSuccess {
case listener: AssociationEventListener upstreamListener = Some(listener)
}
this
}
protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit = {
// Association is simulated to be failed if there was either an inbound or outbound message drop
if (shouldDropInbound(remoteAddress) || shouldDropOutbound(remoteAddress))
statusPromise.success(Fail(new FailureInjectorException("Simulated failure of association to " + remoteAddress)))
else
statusPromise.completeWith(wrappedTransport.associate(remoteAddress).map {
_ match {
case Ready(handle)
addressChaosTable.putIfAbsent(handle.remoteAddress.copy(protocol = "", system = ""), PassThru)
Ready(new FailureInjectorHandle(handle, this))
case s: Status s
}
})
}
def notify(ev: AssociationEvent): Unit = ev match {
case InboundAssociation(handle) if shouldDropInbound(handle.remoteAddress) //Ignore
case _ upstreamListener match {
case Some(listener) listener notify interceptInboundAssociation(ev)
case None
}
}
def interceptInboundAssociation(ev: AssociationEvent): AssociationEvent = ev match {
case InboundAssociation(handle) InboundAssociation(FailureInjectorHandle(handle, this))
case _ ev
}
def shouldDropInbound(remoteAddress: Address): Boolean = chaosMode(remoteAddress) match {
case PassThru false
case Drop(_, inboundDropP) rng.nextDouble() <= inboundDropP
}
def shouldDropOutbound(remoteAddress: Address): Boolean = chaosMode(remoteAddress) match {
case PassThru false
case Drop(outboundDropP, _) rng.nextDouble() <= outboundDropP
}
def chaosMode(remoteAddress: Address): GremlinMode = {
val mode = addressChaosTable.get(remoteAddress.copy(protocol = "", system = ""))
if (mode eq null) PassThru else mode
}
}
private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHandle,
private val gremlinAdapter: FailureInjectorTransportAdapter)
extends AbstractTransportAdapterHandle(_wrappedHandle, FailureInjectorSchemeIdentifier)
with HandleEventListener {
import gremlinAdapter.extendedSystem.dispatcher
@volatile private var upstreamListener: HandleEventListener = null
override val readHandlerPromise: Promise[HandleEventListener] = Promise()
readHandlerPromise.future.onSuccess {
case listener: HandleEventListener
upstreamListener = listener
wrappedHandle.readHandlerPromise.success(this)
}
override def write(payload: ByteString): Boolean = if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress))
wrappedHandle.write(payload)
else true
override def disassociate(): Unit = wrappedHandle.disassociate()
override def notify(ev: HandleEvent): Unit = if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress))
upstreamListener notify ev
}

View file

@ -10,9 +10,159 @@ import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap }
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }
// Default EC is used, but this is just a test utility -- please forgive...
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Transport implementation to be used for testing.
*
* The TestTransport is basically a shared memory between actor systems. The TestTransport could be programmed to
* emulate different failure modes of a Transport implementation. TestTransport keeps a log of the activities it was
* requested to do. This class is not optimized for performace and MUST not be used as an in-memory transport in
* production systems.
*/
class TestTransport(
val localAddress: Address,
final val registry: AssociationRegistry,
val maximumPayloadBytes: Int = 32000,
val schemeIdentifier: String = "test") extends Transport {
def this(system: ExtendedActorSystem, conf: Config) = {
this(
AddressFromURIString(conf.getString("local-address")),
AssociationRegistry.get(conf.getString("registry-key")),
conf.getBytes("maximum-payload-bytes").toInt,
conf.getString("scheme-identifier"))
}
import akka.remote.transport.TestTransport._
override def isResponsibleFor(address: Address): Boolean = true
private val associationListenerPromise = Promise[AssociationEventListener]()
private def defaultListen: Future[(Address, Promise[AssociationEventListener])] = {
associationListenerPromise.future.onSuccess {
case listener: AssociationEventListener registry.registerTransport(this, listener)
}
Promise.successful((localAddress, associationListenerPromise)).future
}
private def defaultAssociate(remoteAddress: Address): Future[Status] = {
registry.transportFor(remoteAddress) match {
case Some((remoteTransport, listener))
val (localHandle, remoteHandle) = createHandlePair(remoteTransport, remoteAddress)
val bothSides: Future[(HandleEventListener, HandleEventListener)] = for (
listener1 localHandle.readHandlerPromise.future;
listener2 remoteHandle.readHandlerPromise.future
) yield (listener1, listener2)
registry.registerListenerPair(localHandle.key, bothSides)
listener notify InboundAssociation(remoteHandle)
Promise.successful(Ready(localHandle)).future
case None
Promise.successful(Fail(new IllegalArgumentException(s"No registered transport: $remoteAddress"))).future
}
}
private def createHandlePair(remoteTransport: TestTransport, remoteAddress: Address): (TestAssociationHandle, TestAssociationHandle) = {
val localHandle = new TestAssociationHandle(localAddress, remoteAddress, this, inbound = false)
val remoteHandle = new TestAssociationHandle(remoteAddress, localAddress, remoteTransport, inbound = true)
(localHandle, remoteHandle)
}
private def defaultShutdown: Future[Unit] = Promise.successful(()).future
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method.
*/
val listenBehavior = new SwitchableLoggedBehavior[Unit, (Address, Promise[AssociationEventListener])](
(_) defaultListen,
(_) registry.logActivity(ListenAttempt(localAddress)))
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the associate() method.
*/
val associateBehavior = new SwitchableLoggedBehavior[Address, Status](
defaultAssociate _,
(remoteAddress) registry.logActivity(AssociateAttempt(localAddress, remoteAddress)))
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the shutdown() method.
*/
val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Unit](
(_) defaultShutdown,
(_) registry.logActivity(ShutdownAttempt(localAddress)))
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior()
override def associate(remoteAddress: Address): Future[Status] = associateBehavior(remoteAddress)
override def shutdown(): Unit = shutdownBehavior()
private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = {
registry.getRemoteReadHandlerFor(params._1) match {
case Some(futureActor)
val writePromise = Promise[Boolean]()
futureActor.onSuccess {
case listener listener notify InboundPayload(params._2); writePromise.success(true)
}
writePromise.future
case None
Promise.failed(new IllegalStateException("No association present")).future
}
}
private def defaultDisassociate(handle: TestAssociationHandle): Future[Unit] = {
registry.deregisterAssociation(handle.key).foreach {
case f: Future[(HandleEventListener, HandleEventListener)] f.onSuccess {
case (listener1, listener2)
(if (handle.inbound) listener1 else listener2) notify Disassociated
}
}
Promise.successful(()).future
}
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the write() method on handles. All
* handle calls pass through this call. Please note, that write operations return a Boolean synchronously, so
* altering the behavior via pushDelayed will turn write to a blocking operation -- use of pushDelayed therefore
* is not recommended.
*/
val writeBehavior = new SwitchableLoggedBehavior[(TestAssociationHandle, ByteString), Boolean](
defaultBehavior = {
defaultWrite _
},
logCallback = {
case (handle, payload)
registry.logActivity(WriteAttempt(handle.localAddress, handle.remoteAddress, payload))
})
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the disassociate() method on handles. All
* handle calls pass through this call.
*/
val disassociateBehavior = new SwitchableLoggedBehavior[TestAssociationHandle, Unit](
defaultBehavior = {
defaultDisassociate _
},
logCallback = {
(handle)
registry.logActivity(DisassociateAttempt(handle.localAddress, handle.remoteAddress))
})
private[akka] def write(handle: TestAssociationHandle, payload: ByteString): Boolean =
Await.result(writeBehavior((handle, payload)), 3 seconds)
private[akka] def disassociate(handle: TestAssociationHandle): Unit = disassociateBehavior(handle)
override def toString: String = s"TestTransport($localAddress)"
}
object TestTransport {
type Behavior[A, B] = (A) Future[B]
@ -133,8 +283,8 @@ object TestTransport {
class AssociationRegistry {
private val activityLog = new CopyOnWriteArrayList[Activity]()
private val transportTable = new ConcurrentHashMap[Address, (TestTransport, ActorRef)]()
private val handlersTable = new ConcurrentHashMap[(Address, Address), Future[(ActorRef, ActorRef)]]()
private val transportTable = new ConcurrentHashMap[Address, (TestTransport, AssociationEventListener)]()
private val listenersTable = new ConcurrentHashMap[(Address, Address), Future[(HandleEventListener, HandleEventListener)]]()
/**
* Logs a transport activity.
@ -167,15 +317,15 @@ object TestTransport {
}
/**
* Records a mapping between an address and the corresponding (transport, actor) pair.
* Records a mapping between an address and the corresponding (transport, associationEventListener) pair.
*
* @param transport
* The transport that is to be registered. The address of this transport will be used as key.
* @param responsibleActor
* The actor that will handle the events for the given transport.
* @param associationEventListener
* The listener that will handle the events for the given transport.
*/
def registerTransport(transport: TestTransport, responsibleActor: ActorRef): Unit = {
transportTable.put(transport.localAddress, (transport, responsibleActor))
def registerTransport(transport: TestTransport, associationEventListener: AssociationEventListener): Unit = {
transportTable.put(transport.localAddress, (transport, associationEventListener))
}
/**
@ -187,23 +337,23 @@ object TestTransport {
* @return
* True if all transports are successfully registered.
*/
def transportsReady(transports: TestTransport*): Boolean = {
transports forall {
t transportTable.containsKey(t.localAddress)
def transportsReady(addresses: Address*): Boolean = {
addresses forall {
transportTable.containsKey(_)
}
}
/**
* Registers a Future of two actors corresponding to the two endpoints of an association.
* Registers a Future of two handle event listeners corresponding to the two endpoints of an association.
*
* @param key
* Ordered pair of addresses representing an association. First element must be the address of the initiator.
* @param readHandlers
* The future containing the actors that will be responsible for handling the events of the two endpoints of the
* @param listeners
* The future containing the listeners that will be responsible for handling the events of the two endpoints of the
* association. Elements in the pair must be in the same order as the addresses in the key parameter.
*/
def registerHandlePair(key: (Address, Address), readHandlers: Future[(ActorRef, ActorRef)]): Unit = {
handlersTable.put(key, readHandlers)
def registerListenerPair(key: (Address, Address), listeners: Future[(HandleEventListener, HandleEventListener)]): Unit = {
listenersTable.put(key, listeners)
}
/**
@ -213,8 +363,8 @@ object TestTransport {
* @return
* The original entries.
*/
def deregisterAssociation(key: (Address, Address)): Option[Future[(ActorRef, ActorRef)]] =
Option(handlersTable.remove(key))
def deregisterAssociation(key: (Address, Address)): Option[Future[(HandleEventListener, HandleEventListener)]] =
Option(listenersTable.remove(key))
/**
* Tests if an association was registered.
@ -225,19 +375,19 @@ object TestTransport {
* @return True if there is an association for the given addresses.
*/
def existsAssociation(initiatorAddress: Address, remoteAddress: Address): Boolean = {
handlersTable.containsKey((initiatorAddress, remoteAddress))
listenersTable.containsKey((initiatorAddress, remoteAddress))
}
/**
* Returns the event handler actor corresponding to the remote endpoint of the given local handle. In other words
* it returns the actor that will receive InboundPayload events when {{{write()}}} is called on the given handle.
* Returns the event handler corresponding to the remote endpoint of the given local handle. In other words
* it returns the listener that will receive InboundPayload events when {{{write()}}} is called on the given handle.
*
* @param localHandle The handle
* @return The option that contains the Future for the handler actor if exists.
* @return The option that contains the Future for the listener if exists.
*/
def getRemoteReadHandlerFor(localHandle: TestAssociationHandle): Option[Future[ActorRef]] = {
Option(handlersTable.get(localHandle.key)) map {
case pairFuture: Future[(ActorRef, ActorRef)] if (localHandle.inbound) {
def getRemoteReadHandlerFor(localHandle: TestAssociationHandle): Option[Future[HandleEventListener]] = {
Option(listenersTable.get(localHandle.key)) map {
case pairFuture: Future[(HandleEventListener, HandleEventListener)] if (localHandle.inbound) {
pairFuture.map { _._1 }
} else {
pairFuture.map { _._2 }
@ -251,7 +401,8 @@ object TestTransport {
* @param address The address bound to the transport.
* @return The transport if exists.
*/
def transportFor(address: Address): Option[(TestTransport, ActorRef)] = Option(transportTable.get(address))
def transportFor(address: Address): Option[(TestTransport, AssociationEventListener)] =
Option(transportTable.get(address))
/**
* Resets the state of the registry. ''Warning!'' This method is not atomic.
@ -259,7 +410,7 @@ object TestTransport {
def reset(): Unit = {
clearLog()
transportTable.clear()
handlersTable.clear()
listenersTable.clear()
}
}
@ -271,7 +422,7 @@ object TestTransport {
of the shared instance must happen during the startup time of the actor system. Association registries are looked
up via a string key. Until we find a better way to inject an AssociationRegistry to multiple actor systems it is
strongly recommended to use long, randomly generated strings to key the registry to avoid interference between tests.
*/
*/
object AssociationRegistry {
private final val registries = scala.collection.mutable.Map[String, AssociationRegistry]()
@ -282,165 +433,13 @@ object AssociationRegistry {
def clear(): Unit = this.synchronized { registries.clear() }
}
/**
* Transport implementation to be used for testing.
*
* The TestTransport is basically a shared memory between actor systems. The TestTransport could be programmed to
* emulate different failure modes of a Transport implementation. TestTransport keeps a log of the activities it was
* requested to do. This class is not optimized for performace and MUST not be used as an in-memory transport in
* production systems.
*/
class TestTransport(
val localAddress: Address,
final val registry: AssociationRegistry,
val maximumPayloadBytes: Int = 32000,
val schemeIdentifier: String = "test") extends Transport {
def this(system: ExtendedActorSystem, conf: Config) = {
this(
AddressFromURIString(conf.getString("local-address")),
AssociationRegistry.get(conf.getString("registry-key")),
conf.getBytes("maximum-payload-bytes").toInt,
conf.getString("scheme-identifier"))
}
import akka.remote.transport.TestTransport._
override def isResponsibleFor(address: Address): Boolean = true
private val actorPromise = Promise[ActorRef]()
private def defaultListen: Future[(Address, Promise[ActorRef])] = {
actorPromise.future.onSuccess {
case actorRef: ActorRef registry.registerTransport(this, actorRef)
}
Promise.successful((localAddress, actorPromise)).future
}
private def defaultAssociate(remoteAddress: Address): Future[Status] = {
registry.transportFor(remoteAddress) match {
case Some((remoteTransport, actor))
val (localHandle, remoteHandle) = createHandlePair(remoteTransport, remoteAddress)
val bothSides: Future[(ActorRef, ActorRef)] = for (
actor1 localHandle.readHandlerPromise.future;
actor2 remoteHandle.readHandlerPromise.future
) yield (actor1, actor2)
registry.registerHandlePair(localHandle.key, bothSides)
actor ! InboundAssociation(remoteHandle)
Promise.successful(Ready(localHandle)).future
case None
Promise.successful(Fail(new IllegalArgumentException(s"No registered transport: $remoteAddress"))).future
}
}
private def createHandlePair(remoteTransport: TestTransport, remoteAddress: Address): (TestAssociationHandle, TestAssociationHandle) = {
val localHandle = new TestAssociationHandle(localAddress, remoteAddress, this, inbound = false)
val remoteHandle = new TestAssociationHandle(remoteAddress, localAddress, remoteTransport, inbound = true)
(localHandle, remoteHandle)
}
private def defaultShutdown: Future[Unit] = Promise.successful(()).future
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method.
*/
val listenBehavior = new SwitchableLoggedBehavior[Unit, (Address, Promise[ActorRef])](
(_) defaultListen,
(_) registry.logActivity(ListenAttempt(localAddress)))
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the associate() method.
*/
val associateBehavior = new SwitchableLoggedBehavior[Address, Status](
defaultAssociate _,
(remoteAddress) registry.logActivity(AssociateAttempt(localAddress, remoteAddress)))
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the shutdown() method.
*/
val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Unit](
(_) defaultShutdown,
(_) registry.logActivity(ShutdownAttempt(localAddress)))
override def listen: Future[(Address, Promise[ActorRef])] = listenBehavior()
override def associate(remoteAddress: Address): Future[Status] = associateBehavior(remoteAddress)
override def shutdown(): Unit = shutdownBehavior()
private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = {
registry.getRemoteReadHandlerFor(params._1) match {
case Some(futureActor)
val writePromise = Promise[Boolean]()
futureActor.onSuccess {
case actor actor ! InboundPayload(params._2); writePromise.success(true)
}
writePromise.future
case None
Promise.failed(new IllegalStateException("No association present")).future
}
}
private def defaultDisassociate(handle: TestAssociationHandle): Future[Unit] = {
registry.deregisterAssociation(handle.key).foreach {
case f: Future[(ActorRef, ActorRef)] f.onSuccess {
case (handler1, handler2)
val handler = if (handle.inbound) handler2 else handler1
handler ! Disassociated
}
}
Promise.successful(()).future
}
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the write() method on handles. All
* handle calls pass through this call. Please note, that write operations return a Boolean synchronously, so
* altering the behavior via pushDelayed will turn write to a blocking operation -- use of pushDelayed therefore
* is not recommended.
*/
val writeBehavior = new SwitchableLoggedBehavior[(TestAssociationHandle, ByteString), Boolean](
defaultBehavior = {
defaultWrite _
},
logCallback = {
case (handle, payload)
registry.logActivity(WriteAttempt(handle.localAddress, handle.remoteAddress, payload))
})
/**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the disassociate() method on handles. All
* handle calls pass through this call.
*/
val disassociateBehavior = new SwitchableLoggedBehavior[TestAssociationHandle, Unit](
defaultBehavior = {
defaultDisassociate _
},
logCallback = {
(handle)
registry.logActivity(DisassociateAttempt(handle.localAddress, handle.remoteAddress))
})
private[akka] def write(handle: TestAssociationHandle, payload: ByteString): Boolean =
Await.result(writeBehavior((handle, payload)), 3 seconds)
private[akka] def disassociate(handle: TestAssociationHandle): Unit = disassociateBehavior(handle)
override def toString: String = s"TestTransport($localAddress)"
}
case class TestAssociationHandle(
localAddress: Address,
remoteAddress: Address,
transport: TestTransport,
inbound: Boolean) extends AssociationHandle {
override val readHandlerPromise: Promise[ActorRef] = Promise()
override val readHandlerPromise: Promise[HandleEventListener] = Promise()
override def write(payload: ByteString): Boolean = transport.write(this, payload)

View file

@ -0,0 +1,425 @@
package akka.remote.transport
import ThrottlerTransportAdapter._
import akka.actor._
import akka.pattern.pipe
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenerRegistered
import akka.remote.transport.AkkaPduCodec.Associate
import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload, HandleEventListener }
import akka.remote.transport.ThrottledAssociation._
import akka.remote.transport.ThrottlerManager.Checkin
import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle
import akka.remote.transport.Transport._
import akka.util.ByteString
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.Promise
import scala.math.min
import scala.util.Success
import scala.util.control.NonFatal
import scala.concurrent.duration._
class ThrottlerProvider extends TransportAdapterProvider {
def apply(wrappedTransport: Transport, system: ExtendedActorSystem): Transport =
new ThrottlerTransportAdapter(wrappedTransport, system)
}
object ThrottlerTransportAdapter {
val SchemeIdentifier = "trttl"
val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)
sealed trait Direction {
def includes(other: Direction): Boolean
}
object Direction {
case object Send extends Direction {
override def includes(other: Direction): Boolean = other match {
case Send true
case _ false
}
}
case object Receive extends Direction {
override def includes(other: Direction): Boolean = other match {
case Receive true
case _ false
}
}
case object Both extends Direction {
override def includes(other: Direction): Boolean = true
}
}
case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode)
sealed trait ThrottleMode {
def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean)
def timeToAvailable(currentTime: Long, tokens: Int): Long
}
case class TokenBucket(capacity: Int, tokensPerSecond: Double, lastSend: Long, availableTokens: Int)
extends ThrottleMode {
private def isAvailable(timeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) {
true // Allow messages larger than capacity through, it will be recorded as negative tokens
} else min((availableTokens + tokensGenerated(timeOfSend)), capacity) >= tokens
override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = {
if (isAvailable(timeOfSend, tokens))
(this.copy(
lastSend = timeOfSend,
availableTokens = min(availableTokens - tokens + tokensGenerated(timeOfSend), capacity)), true)
else (this, false)
}
override def timeToAvailable(currentTime: Long, tokens: Int): Long = {
val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentTime)
TimeUnit.SECONDS.toNanos((needed / tokensPerSecond).toLong)
}
private def tokensGenerated(timeOfSend: Long): Int =
(TimeUnit.NANOSECONDS.toMillis(timeOfSend - lastSend) * tokensPerSecond / 1000.0).toInt
}
case object Unthrottled extends ThrottleMode {
override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true)
override def timeToAvailable(currentTime: Long, tokens: Int): Long = 1L
}
case object Blackhole extends ThrottleMode {
override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false)
override def timeToAvailable(currentTime: Long, tokens: Int): Long = 0L
}
}
class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedActorSystem)
extends ActorTransportAdapter(_wrappedTransport, _system) {
override protected def addedSchemeIdentifier = SchemeIdentifier
override protected def maximumOverhead = 0
protected def managerName = s"throttlermanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}"
protected def managerProps = Props(new ThrottlerManager(wrappedTransport))
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match {
case s @ SetThrottle(_, _, _)
manager ! s
statusPromise.success(true)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
}
}
private[transport] object ThrottlerManager {
case class OriginResolved()
case class Checkin(origin: Address, handle: ThrottlerHandle)
}
private[transport] class ThrottlerManager(wrappedTransport: Transport) extends Actor {
import context.dispatcher
private val ids = Iterator from 0
var localAddress: Address = _
private var associationHandler: AssociationEventListener = _
private var throttlingModes = Map[Address, (ThrottleMode, Direction)]()
private var handleTable = Map[Address, ThrottlerHandle]()
private def nakedAddress(address: Address): Address = address.copy(protocol = "", system = "")
override def postStop(): Unit = wrappedTransport.shutdown()
def receive: Receive = {
case ListenUnderlying(listenAddress, upstreamListenerFuture)
localAddress = listenAddress
upstreamListenerFuture.future.map { ListenerRegistered(_) } pipeTo self
case ListenerRegistered(listener)
associationHandler = listener
context.become(ready)
// Block inbound associations until handler is registered
case InboundAssociation(handle)
handle.disassociate()
}
private def ready: Receive = {
case InboundAssociation(handle)
val wrappedHandle = wrapHandle(handle, true)
wrappedHandle.throttlerActor ! wrappedHandle
case AssociateUnderlying(remoteAddress, statusPromise)
wrappedTransport.associate(remoteAddress).onComplete {
case Success(Ready(handle))
val wrappedHandle = wrapHandle(handle, false)
val inMode = getInboundMode(nakedAddress(remoteAddress))
wrappedHandle.outboundThrottleMode.set(getOutboundMode(nakedAddress(remoteAddress)))
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
statusPromise.success(Ready(wrappedHandle))
case s @ _ statusPromise.complete(s)
}
case s @ SetThrottle(address, direction, mode)
val naked = nakedAddress(address)
throttlingModes += naked -> (mode, direction)
handleTable.get(naked) match {
case Some(handle) setMode(handle, mode, direction)
case None
}
case Checkin(origin, handle)
val naked: Address = nakedAddress(origin)
handleTable += naked -> handle
setMode(naked, handle)
}
private def getInboundMode(nakedAddress: Address): ThrottleMode = {
throttlingModes.get(nakedAddress) match {
case Some((mode, direction)) if direction.includes(Direction.Receive) mode
case _ Unthrottled
}
}
private def getOutboundMode(nakedAddress: Address): ThrottleMode = {
throttlingModes.get(nakedAddress) match {
case Some((mode, direction)) if direction.includes(Direction.Send) mode
case _ Unthrottled
}
}
private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = {
throttlingModes.get(nakedAddress) match {
case Some((mode, direction)) setMode(handle, mode, direction)
case None setMode(handle, Unthrottled, Direction.Both)
}
}
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = {
if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode
if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode)
}
private def wrapHandle(originalHandle: AssociationHandle, inbound: Boolean): ThrottlerHandle = {
val throttlerActor = context.actorOf(Props(new ThrottledAssociation(self, associationHandler, originalHandle, inbound)),
"throttler" + ids.next())
val handle = ThrottlerHandle(originalHandle, throttlerActor)
handleTable += nakedAddress(originalHandle.remoteAddress) -> handle
handle
}
}
object ThrottledAssociation {
case object Dequeue
sealed trait ThrottlerState
// --- Chain of states for inbound associations
// Waiting for the ThrottlerHandle coupled with the throttler actor.
case object WaitExposedHandle extends ThrottlerState
// Waiting for the ASSOCIATE message that contains the origin address of the remote endpoint
case object WaitOrigin extends ThrottlerState
// After origin is known and a Checkin message is sent to the manager, we must wait for the ThrottlingMode for the
// address
case object WaitMode extends ThrottlerState
// After all information is known, the throttler must wait for the upstream listener to be able to forward messages
case object WaitUpstreamListener extends ThrottlerState
// --- States for outbound associations
// Waiting for the tuple containing the upstream listener and ThrottleMode
case object WaitModeAndUpstreamListener extends ThrottlerState
// Fully initialized state
case object Throttling extends ThrottlerState
sealed trait ThrottlerData
case object Uninitialized extends ThrottlerData
case class ExposedHandle(handle: ThrottlerHandle) extends ThrottlerData
}
private[transport] class ThrottledAssociation(
val manager: ActorRef,
val associationHandler: AssociationEventListener,
val originalHandle: AssociationHandle,
val inbound: Boolean)
extends Actor with LoggingFSM[ThrottlerState, ThrottlerData] {
import context.dispatcher
var inboundThrottleMode: ThrottleMode = _
var queue = Queue.empty[ByteString]
var upstreamListener: HandleEventListener = _
override def postStop(): Unit = originalHandle.disassociate()
if (inbound) startWith(WaitExposedHandle, Uninitialized) else {
originalHandle.readHandlerPromise.success(self)
startWith(WaitModeAndUpstreamListener, Uninitialized)
}
when(WaitExposedHandle) {
case Event(handle: ThrottlerHandle, Uninitialized)
// register to downstream layer and wait for origin
originalHandle.readHandlerPromise.success(self)
goto(WaitOrigin) using ExposedHandle(handle)
}
when(WaitOrigin) {
case Event(InboundPayload(p), ExposedHandle(exposedHandle))
queue = queue enqueue p
peekOrigin(p) match {
case Some(origin)
manager ! Checkin(origin, exposedHandle)
goto(WaitMode)
case None stay()
}
}
when(WaitMode) {
case Event(InboundPayload(p), _)
queue = queue enqueue p
stay()
case Event(mode: ThrottleMode, ExposedHandle(exposedHandle))
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) {
queue = Queue.empty[ByteString]
exposedHandle.disassociate()
stop()
} else {
associationHandler notify InboundAssociation(exposedHandle)
exposedHandle.readHandlerPromise.future pipeTo self
goto(WaitUpstreamListener)
}
}
when(WaitUpstreamListener) {
case Event(InboundPayload(p), _)
queue = queue enqueue p
stay()
case Event(listener: HandleEventListener, _)
upstreamListener = listener
self ! Dequeue
goto(Throttling)
}
when(WaitModeAndUpstreamListener) {
case Event((listener: HandleEventListener, mode: ThrottleMode), _)
upstreamListener = listener
inboundThrottleMode = mode
self ! Dequeue
goto(Throttling)
case Event(InboundPayload(p), _)
queue = queue enqueue p
stay()
}
when(Throttling) {
case Event(mode: ThrottleMode, _)
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) queue = Queue.empty[ByteString]
stay()
case Event(InboundPayload(p), _)
forwardOrDelay(p)
stay()
case Event(Dequeue, _)
if (!queue.isEmpty) {
val (payload, newqueue) = queue.dequeue
upstreamListener notify InboundPayload(payload)
queue = newqueue
inboundThrottleMode = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), payload.length)._1
if (inboundThrottleMode == Unthrottled && !queue.isEmpty) self ! Dequeue
else if (!queue.isEmpty) {
context.system.scheduler.scheduleOnce(
inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length) nanoseconds, self, Dequeue)
}
}
stay()
}
whenUnhandled {
case Event(Disassociated, _)
if (upstreamListener ne null) upstreamListener notify Disassociated
originalHandle.disassociate()
stop()
}
// This method captures ASSOCIATE packets and extracts the origin address
private def peekOrigin(b: ByteString): Option[Address] = {
try {
AkkaPduProtobufCodec.decodePdu(b) match {
case Associate(_, origin) Some(origin)
case _ None
}
} catch {
// This layer should not care about malformed packets. Also, this also useful for testing, because
// arbitrary payload could be passed in
case NonFatal(e) None
}
}
def forwardOrDelay(payload: ByteString): Unit = {
if (inboundThrottleMode == Blackhole) {
// Do nothing
} else {
if (queue.isEmpty) {
val tokens = payload.length
val (newbucket, success) = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), tokens)
if (success) {
inboundThrottleMode = newbucket
upstreamListener notify InboundPayload(payload)
} else {
queue = queue.enqueue(payload)
context.system.scheduler.scheduleOnce(
inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens) nanoseconds, self, Dequeue)
}
} else {
queue = queue.enqueue(payload)
}
}
}
}
private[transport] case class ThrottlerHandle(_wrappedHandle: AssociationHandle, throttlerActor: ActorRef)
extends AbstractTransportAdapterHandle(_wrappedHandle, SchemeIdentifier) {
private[transport] val outboundThrottleMode = new AtomicReference[ThrottleMode](Unthrottled)
override val readHandlerPromise: Promise[HandleEventListener] = Promise()
override def write(payload: ByteString): Boolean = {
val tokens = payload.length
@tailrec def tryConsume(currentBucket: ThrottleMode): Boolean = {
val timeOfSend = System.nanoTime()
val (newBucket, allow) = currentBucket.tryConsumeTokens(timeOfSend, tokens)
if (allow) {
if (outboundThrottleMode.compareAndSet(currentBucket, newBucket)) true
else tryConsume(outboundThrottleMode.get())
} else false
}
outboundThrottleMode.get match {
case Blackhole true
case bucket @ _
val success = tryConsume(outboundThrottleMode.get())
if (success) wrappedHandle.write(payload)
success
}
}
override def disassociate(): Unit = {
throttlerActor ! PoisonPill
}
}

View file

@ -3,23 +3,27 @@ package akka.remote.transport
import concurrent.{ Promise, Future }
import akka.actor.{ ActorRef, Address }
import akka.util.ByteString
import akka.remote.transport.Transport.AssociationEvent
import akka.remote.transport.AssociationHandle.HandleEventListener
object Transport {
trait AssociationEvent
/**
* Represents fine grained status of an association attempt.
*/
sealed trait Status
sealed trait Status extends AssociationEvent
/**
* Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address,
* hostname, etc.). Invalid association requests are impossible to recover.
* hostname, etc.).
*/
case class Invalid(cause: Throwable) extends Status
/**
* The association setup has failed, but no information can be provided about the probability of the success of a
* setup retry.
* The association setup has failed, but it is not known that a recovery is possible or not. Generally it means
* that the transport gave up its attempts to associate, but a retry might be successful at a later time.
*
* @param cause Cause of the failure
*/
@ -36,19 +40,42 @@ object Transport {
case class Ready(association: AssociationHandle) extends Status
/**
* Message sent to an actor registered to a transport (via the Promise returned by
* [[akka.remote.transport.Transport.listen]]) when an inbound association request arrives.
* Message sent to a [[akka.remote.transport.Transport.AssociationEventListener]] registered to a transport
* (via the Promise returned by [[akka.remote.transport.Transport.listen]]) when an inbound association request arrives.
*
* @param association
* The handle for the inbound association.
*/
case class InboundAssociation(association: AssociationHandle)
case class InboundAssociation(association: AssociationHandle) extends AssociationEvent
/**
* An interface that needs to be implemented by the user of a transport to listen to association events
*/
trait AssociationEventListener {
/**
* Called by the transport to notify the listener about an AssociationEvent
* @param ev The AssociationEvent of the transport
*/
def notify(ev: AssociationEvent): Unit
}
/**
* Class to convert ordinary [[akka.actor.ActorRef]] instances to an AssociationEventListener. The adapter will
* forward event objects as messages to the provided ActorRef.
* @param actor
*/
case class ActorAssociationEventListener(actor: ActorRef) extends AssociationEventListener {
override def notify(ev: AssociationEvent): Unit = actor ! ev
}
implicit def actorRef2HandleEventListener(actor: ActorRef): AssociationEventListener =
ActorAssociationEventListener(actor)
}
/**
* An SPI layer for implementing asynchronous transport mechanisms. The transport is responsible for initializing the
* underlying transport mechanism and setting up logical links between transport entities.
* An SPI layer for implementing asynchronous transport mechanisms. The Transport is responsible for initializing the
* underlying transmission mechanism and setting up logical links between transport entities.
*
* Transport implementations that are loaded dynamically by the remoting must have a constructor that accepts a
* [[com.typesafe.config.Config]] and an [[akka.actor.ExtendedActorSystem]] as parameters.
@ -86,14 +113,15 @@ trait Transport {
/**
* Asynchronously attempts to setup the transport layer to listen and accept incoming associations. The result of the
* attempt is wrapped by a Future returned by this method. The pair contained in the future contains a Promise for an
* ActorRef. By completing this Promise with an ActorRef, that ActorRef becomes responsible for handling incoming
* associations. Until the Promise is not completed, no associations are processed.
* ActorRef. By completing this Promise with an [[akka.remote.transport.Transport.AssociationEventListener]], that
* listener becomes responsible for handling incoming associations. Until the Promise is not completed, no associations
* are processed.
*
* @return
* A Future containing a pair of the bound local address and a Promise of an ActorRef that must be fulfilled
* by the consumer of the future.
* A Future containing a pair of the bound local address and a Promise of an AssociationListener that must be
* completed by the consumer of the future.
*/
def listen: Future[(Address, Promise[ActorRef])]
def listen: Future[(Address, Promise[AssociationEventListener])]
/**
* Asynchronously opens a logical duplex link between two Transport Entities over a network. It could be backed by a
@ -118,36 +146,69 @@ trait Transport {
*/
def shutdown(): Unit
/**
* This method allows upper layers to send management commands to the transport. It is the responsibility of the
* sender to send appropriate commands to different transport implementations. Unknown commands will be ignored.
*
* @param cmd Command message to the transport
* @return Future that succeeds when the command was handled or dropped
*/
def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = { statusPromise.success(false) }
}
object AssociationHandle {
/**
* Trait for events that the registered actor for an [[akka.remote.transport.AssociationHandle]] might receive.
* Trait for events that the registered listener for an [[akka.remote.transport.AssociationHandle]] might receive.
*/
sealed trait AssociationEvent
sealed trait HandleEvent
/**
* Message sent to the actor registered to an association (via the Promise returned by
* Message sent to the listener registered to an association (via the Promise returned by
* [[akka.remote.transport.AssociationHandle.readHandlerPromise]]) when an inbound payload arrives.
*
* @param payload
* The raw bytes that were sent by the remote endpoint.
*/
case class InboundPayload(payload: ByteString) extends AssociationEvent
case class InboundPayload(payload: ByteString) extends HandleEvent {
override def toString: String = s"InboundPayload(size = ${payload.length} bytes)"
}
/**
* Message sent to te actor registered to an association
* Message sent to the listener registered to an association
*/
case object Disassociated extends AssociationEvent
case object Disassociated extends HandleEvent
/**
* An interface that needs to be implemented by the user of an [[akka.remote.transport.AssociationHandle]]
* to listen to association events.
*/
trait HandleEventListener {
/**
* Called by the transport to notify the listener about a HandleEvent
* @param ev The HandleEvent of the handle
*/
def notify(ev: HandleEvent): Unit
}
/**
* Class to convert ordinary [[akka.actor.ActorRef]] instances to a HandleEventListener. The adapter will
* forward event objects as messages to the provided ActorRef.
* @param actor
*/
case class ActorHandleEventListener(actor: ActorRef) extends HandleEventListener {
override def notify(ev: HandleEvent): Unit = actor ! ev
}
implicit def actorRef2HandleEventListener(actor: ActorRef): HandleEventListener = ActorHandleEventListener(actor)
}
/**
* An SPI layer for abstracting over logical links (associations) created by [[akka.remote.transport.Transport]].
* An SPI layer for abstracting over logical links (associations) created by a [[akka.remote.transport.Transport]].
* Handles are responsible for providing an API for sending and receiving from the underlying channel.
*
* To register an actor for processing incoming payload data, the actor must be registered by completing the Promise
* To register a listener for processing incoming payload data, the listener must be registered by completing the Promise
* returned by [[akka.remote.transport.AssociationHandle#readHandlerPromise]]. Incoming data is not processed until
* this registration takes place.
*/
@ -170,22 +231,23 @@ trait AssociationHandle {
def remoteAddress: Address
/**
* The Promise returned by this call must be completed with an [[akka.actor.ActorRef]] to register an actor
* responsible for handling incoming payload.
* The Promise returned by this call must be completed with an [[akka.remote.transport.AssociationHandle.HandleEventListener]]
* to register a listener responsible for handling incoming payload. Until the listener is not registered the
* transport SHOULD buffer incoming messages.
*
* @return
* Promise of the ActorRef of the actor responsible for handling incoming data.
* Promise that must be completed with the listener responsible for handling incoming data.
*/
def readHandlerPromise: Promise[ActorRef]
def readHandlerPromise: Promise[HandleEventListener]
/**
* Asynchronously sends the specified payload to the remote endpoint. This method must be thread-safe as it might
* be called from different threads. This method must not block.
* Asynchronously sends the specified payload to the remote endpoint. This method MUST be thread-safe as it might
* be called from different threads. This method MUST NOT block.
*
* Writes guarantee ordering of messages, but not their reception. The call to write returns with
* a Boolean indicating if the channel was ready for writes or not. A return value of false indicates that the
* channel is not yet ready for delivery (e.g.: the write buffer is full) and the sender needs to wait
* until the channel becomes ready again. Returning false also means that the current write was dropped (this is
* until the channel becomes ready again. Returning false also means that the current write was dropped (this MUST be
* guaranteed to ensure duplication-free delivery).
*
* @param payload
@ -196,9 +258,10 @@ trait AssociationHandle {
def write(payload: ByteString): Boolean
/**
* Closes the underlying transport link, if needed. Some transport may not need an explicit teardown (UDP) and
* some transports may not support it (hardware connections). Remote endpoint of the channel or connection ''may''
* be notified, but this is not guaranteed.
* Closes the underlying transport link, if needed. Some transports might not need an explicit teardown (UDP) and
* some transports may not support it (hardware connections). Remote endpoint of the channel or connection MAY
* be notified, but this is not guaranteed. The Transport that provides the handle MUST guarantee that disassociate()
* could be called arbitrarily many times.
*
*/
def disassociate(): Unit

View file

@ -1,7 +1,7 @@
package akka.remote.transport.netty
import akka.ConfigurationException
import akka.actor.{ Address, ExtendedActorSystem, ActorRef }
import akka.actor.{ Address, ExtendedActorSystem }
import akka.event.Logging
import akka.remote.netty.{ SSLSettings, NettySSLSupport, DefaultDisposableChannelGroup }
import akka.remote.transport.Transport._
@ -21,6 +21,7 @@ import scala.concurrent.{ ExecutionContext, Promise, Future }
import scala.util.Random
import scala.util.control.NonFatal
import akka.dispatch.ThreadPoolConfig
import akka.remote.transport.AssociationHandle.HandleEventListener
object NettyTransportSettings {
sealed trait Mode
@ -37,7 +38,7 @@ class NettyTransportSettings(config: Config) {
val TransportMode: Mode = getString("transport-protocol") match {
case "tcp" Tcp
case "udp" Udp
case s @ _ throw new ConfigurationException("Unknown transport specified in transport-protocol: " + s)
case s @ _ throw new ConfigurationException("Unknown transport: " + s)
}
val EnableSsl: Boolean = if (getBoolean("enable-ssl") && TransportMode == Udp)
@ -95,13 +96,15 @@ trait HasTransport {
}
trait CommonHandlers extends NettyHelpers with HasTransport {
import transport.executionContext
final override def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = transport.channels.add(e.getChannel)
protected def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle
protected def registerReader(channel: Channel, readerRef: ActorRef, msg: ChannelBuffer, remoteSocketAddress: InetSocketAddress): Unit
protected def registerListener(channel: Channel,
listener: HandleEventListener,
msg: ChannelBuffer,
remoteSocketAddress: InetSocketAddress): Unit
final protected def init(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer)(op: (AssociationHandle Any)): Unit = {
import transport._
@ -109,8 +112,8 @@ trait CommonHandlers extends NettyHelpers with HasTransport {
case (Some(localAddress), Some(remoteAddress))
val handle = createHandle(channel, localAddress, remoteAddress)
handle.readHandlerPromise.future.onSuccess {
case readerRef: ActorRef
registerReader(channel, readerRef, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress])
case listener: HandleEventListener
registerListener(channel, listener, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress])
channel.setReadable(true)
}
op(handle)
@ -121,14 +124,15 @@ trait CommonHandlers extends NettyHelpers with HasTransport {
}
abstract class ServerHandler(protected final val transport: NettyTransport,
private final val associationHandlerFuture: Future[ActorRef])
private final val associationListenerFuture: Future[AssociationEventListener])
extends NettyServerHelpers with CommonHandlers with HasTransport {
import transport.executionContext
final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = {
channel.setReadable(false)
associationHandlerFuture.onSuccess {
case ref: ActorRef init(channel, remoteSocketAddress, msg) { ref ! InboundAssociation(_) }
associationListenerFuture.onSuccess {
case listener: AssociationEventListener init(channel, remoteSocketAddress, msg) { listener notify InboundAssociation(_) }
}
}
@ -157,6 +161,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
import NettyTransport._
import settings._
implicit val executionContext: ExecutionContext = system.dispatcher
override val schemeIdentifier: String = TransportMode + (if (EnableSsl) ".ssl" else "")
@ -169,7 +174,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
private val log = Logging(system, this.getClass)
final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, ActorRef]()
final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]()
val channels = new DefaultDisposableChannelGroup("netty-transport-" + Random.nextString(20))
@ -202,13 +207,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
pipeline
}
private val associationHandlerPromise: Promise[ActorRef] = Promise()
private val associationListenerPromise: Promise[AssociationEventListener] = Promise()
private val serverPipelineFactory: ChannelPipelineFactory = new ChannelPipelineFactory {
override def getPipeline: ChannelPipeline = {
val pipeline = newPipeline
if (EnableSsl) pipeline.addFirst("SslHandler", NettySSLSupport(settings.SslSettings.get, log, false))
val handler = if (isDatagram) new UdpServerHandler(NettyTransport.this, associationHandlerPromise.future)
else new TcpServerHandler(NettyTransport.this, associationHandlerPromise.future)
val handler = if (isDatagram) new UdpServerHandler(NettyTransport.this, associationListenerPromise.future)
else new TcpServerHandler(NettyTransport.this, associationListenerPromise.future)
pipeline.addLast("ServerHandler", handler)
pipeline
}
@ -266,8 +271,8 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
def addressToSocketAddress(addr: Address): InetSocketAddress =
new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get)
override def listen: Future[(Address, Promise[ActorRef])] = {
val listenPromise: Promise[(Address, Promise[ActorRef])] = Promise()
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise()
try {
masterChannel = inboundBootstrap match {
@ -282,12 +287,12 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
addressFromSocketAddress(masterChannel.getLocalAddress, Some(system.name), Some(settings.Hostname)) match {
case Some(address)
val handlerPromise: Promise[ActorRef] = Promise()
listenPromise.success((address, handlerPromise))
val listenerPromise: Promise[AssociationEventListener] = Promise()
listenPromise.success((address, listenerPromise))
localAddress = address
handlerPromise.future.onSuccess {
case ref: ActorRef
associationHandlerPromise.success(ref)
listenerPromise.future.onSuccess {
case listener: AssociationEventListener
associationListenerPromise.success(listener)
masterChannel.setReadable(true)
}
@ -338,7 +343,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
case addr: InetSocketAddress
statusPromise.success(Ready(handle))
handle.readHandlerPromise.future.onSuccess {
case ref: ActorRef udpConnectionTable.put(addr, ref)
case listener: HandleEventListener udpConnectionTable.put(addr, listener)
}
case a @ _ statusPromise.success(Fail(
new NettyTransportException("Unknown remote address type " + a.getClass, null)))

View file

@ -1,48 +1,48 @@
package akka.remote.transport.netty
import akka.actor.{ Address, ActorRef }
import akka.actor.{ Address }
import akka.remote.transport.AssociationHandle
import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload }
import akka.remote.transport.Transport.Status
import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener, Disassociated, InboundPayload }
import akka.remote.transport.Transport.{ AssociationEventListener, Status }
import akka.util.ByteString
import java.net.InetSocketAddress
import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
import org.jboss.netty.channel._
import scala.concurrent.{ Future, Promise }
object ChannelLocalActor extends ChannelLocal[Option[ActorRef]] {
override def initialValue(channel: Channel): Option[ActorRef] = None
def trySend(channel: Channel, msg: Any): Unit = get(channel) foreach { _ ! msg }
object ChannelLocalActor extends ChannelLocal[Option[HandleEventListener]] {
override def initialValue(channel: Channel): Option[HandleEventListener] = None
def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel) foreach { _ notify msg }
}
trait TcpHandlers extends CommonHandlers with HasTransport {
import ChannelLocalActor._
override def registerReader(channel: Channel,
readerRef: ActorRef,
override def registerListener(channel: Channel,
listener: HandleEventListener,
msg: ChannelBuffer,
remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(readerRef))
remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(listener))
override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle =
new TcpAssociationHandle(localAddress, remoteAddress, channel)
override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
trySend(e.getChannel, Disassociated)
notifyListener(e.getChannel, Disassociated)
}
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) {
trySend(e.getChannel, InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array())))
notifyListener(e.getChannel, InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array())))
}
override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent) {
trySend(e.getChannel, Disassociated)
notifyListener(e.getChannel, Disassociated)
e.getChannel.close() // No graceful close here
}
}
class TcpServerHandler(_transport: NettyTransport, _associationHandlerFuture: Future[ActorRef])
extends ServerHandler(_transport, _associationHandlerFuture) with TcpHandlers {
class TcpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener])
extends ServerHandler(_transport, _associationListenerFuture) with TcpHandlers {
override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
initInbound(e.getChannel, e.getChannel.getRemoteAddress, null)
@ -62,7 +62,7 @@ class TcpClientHandler(_transport: NettyTransport, _statusPromise: Promise[Statu
class TcpAssociationHandle(val localAddress: Address, val remoteAddress: Address, private val channel: Channel)
extends AssociationHandle {
override val readHandlerPromise: Promise[ActorRef] = Promise()
override val readHandlerPromise: Promise[HandleEventListener] = Promise()
override def write(payload: ByteString): Boolean = if (channel.isWritable && channel.isOpen) {
channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer))

View file

@ -1,9 +1,9 @@
package akka.remote.transport.netty
import akka.actor.{ ActorRef, Address }
import akka.actor.{ Address }
import akka.remote.transport.AssociationHandle
import akka.remote.transport.AssociationHandle.InboundPayload
import akka.remote.transport.Transport.Status
import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload }
import akka.remote.transport.Transport.{ AssociationEventListener, Status }
import akka.util.ByteString
import java.net.{ SocketAddress, InetAddress, InetSocketAddress }
import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers }
@ -15,16 +15,16 @@ trait UdpHandlers extends CommonHandlers with HasTransport {
override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle =
new UdpAssociationHandle(localAddress, remoteAddress, channel, transport)
override def registerReader(channel: Channel,
readerRef: ActorRef,
override def registerListener(channel: Channel,
listener: HandleEventListener,
msg: ChannelBuffer,
remoteSocketAddress: InetSocketAddress): Unit = {
val oldReader: ActorRef = transport.udpConnectionTable.putIfAbsent(remoteSocketAddress, readerRef)
val oldReader: HandleEventListener = transport.udpConnectionTable.putIfAbsent(remoteSocketAddress, listener)
if (oldReader ne null) {
throw new NettyTransportException(s"Reader $readerRef attempted to register for remote address $remoteSocketAddress" +
throw new NettyTransportException(s"Listener $listener attempted to register for remote address $remoteSocketAddress" +
s" but $oldReader was already registered.", null)
}
readerRef ! InboundPayload(ByteString(msg.array()))
listener notify InboundPayload(ByteString(msg.array()))
}
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) {
@ -35,8 +35,8 @@ trait UdpHandlers extends CommonHandlers with HasTransport {
initUdp(e.getChannel, e.getRemoteAddress, e.getMessage.asInstanceOf[ChannelBuffer])
} else {
val reader = transport.udpConnectionTable.get(inetSocketAddress)
reader ! InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))
val listener = transport.udpConnectionTable.get(inetSocketAddress)
listener notify InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))
}
}
}
@ -44,8 +44,8 @@ trait UdpHandlers extends CommonHandlers with HasTransport {
def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit
}
class UdpServerHandler(_transport: NettyTransport, _associationHandlerFuture: Future[ActorRef])
extends ServerHandler(_transport, _associationHandlerFuture) with UdpHandlers {
class UdpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener])
extends ServerHandler(_transport, _associationListenerFuture) with UdpHandlers {
override def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit =
initInbound(channel, remoteSocketAddress, msg)
@ -63,7 +63,7 @@ class UdpAssociationHandle(val localAddress: Address,
private val channel: Channel,
private val transport: NettyTransport) extends AssociationHandle {
override val readHandlerPromise: Promise[ActorRef] = Promise()
override val readHandlerPromise: Promise[HandleEventListener] = Promise()
override def write(payload: ByteString): Boolean = {
if (!channel.isConnected)

View file

@ -38,6 +38,7 @@ object RemoteCommunicationSpec {
class RemoteCommunicationSpec extends AkkaSpec("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.transport = "akka.remote.netty.NettyRemoteTransport"
remote.netty {
hostname = localhost
port = 12345
@ -48,7 +49,7 @@ akka {
/looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
}
}
""") with ImplicitSender with DefaultTimeout {
""") with ImplicitSender with DefaultTimeout {
import RemoteCommunicationSpec._

View file

@ -19,9 +19,10 @@ class RemoteConfigSpec extends AkkaSpec(
}
""") {
// These tests are ignored as it tests configuration specific to the old remoting.
"Remoting" must {
"be able to parse generic remote config elements" in {
"be able to parse generic remote config elements" ignore {
val settings = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].remoteSettings
import settings._
@ -31,7 +32,7 @@ class RemoteConfigSpec extends AkkaSpec(
LogRemoteLifeCycleEvents must be(true)
}
"be able to parse Netty config elements" in {
"be able to parse Netty config elements" ignore {
val settings =
system.asInstanceOf[ExtendedActorSystem]
.provider.asInstanceOf[RemoteActorRefProvider]

View file

@ -13,17 +13,18 @@ akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/watchers.remote = "akka://other@127.0.0.1:2666"
/watchers.remote = "tcp.akka://other@localhost:2666"
}
}
remote.netty {
hostname = "127.0.0.1"
remoting.tcp {
hostname = "localhost"
port = 0
}
}
""")) with ImplicitSender with DefaultTimeout with DeathWatchSpec {
""")) with ImplicitSender with DefaultTimeout with DeathWatchSpec {
val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.netty.port=2666").withFallback(system.settings.config))
val other = ActorSystem("other", ConfigFactory.parseString("akka.remoting.transports.tcp.port=2666")
.withFallback(system.settings.config))
override def atTermination() {
other.shutdown()

View file

@ -18,7 +18,7 @@ object RemoteDeployerSpec {
remote = "akka://sys@wallace:2552"
}
}
akka.remote.netty.port = 0
akka.remoting.transports.tcp.port = 0
""", ConfigParseOptions.defaults)
class RecipeActor extends Actor {

View file

@ -21,7 +21,7 @@ object RemoteRouterSpec {
class RemoteRouterSpec extends AkkaSpec("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
remoting.transports.tcp {
hostname = localhost
port = 0
}
@ -29,7 +29,7 @@ akka {
/blub {
router = round-robin
nr-of-instances = 2
target.nodes = ["akka://remote-sys@localhost:12347"]
target.nodes = ["tcp.akka://remote-sys@localhost:12347"]
}
/elastic-blub {
router = round-robin
@ -37,10 +37,10 @@ akka {
lower-bound = 2
upper-bound = 3
}
target.nodes = ["akka://remote-sys@localhost:12347"]
target.nodes = ["tcp.akka://remote-sys@localhost:12347"]
}
/remote-blub {
remote = "akka://remote-sys@localhost:12347"
remote = "tcp.akka://remote-sys@localhost:12347"
router = round-robin
nr-of-instances = 2
}
@ -48,12 +48,12 @@ akka {
remote = "akka://RemoteRouterSpec"
router = round-robin
nr-of-instances = 2
target.nodes = ["akka://remote-sys@localhost:12347"]
target.nodes = ["tcp.akka://remote-sys@localhost:12347"]
}
/local-blub2 {
router = round-robin
nr-of-instances = 4
target.nodes = ["akka://remote-sys@localhost:12347"]
target.nodes = ["tcp.akka://remote-sys@localhost:12347"]
}
}
}
@ -61,7 +61,7 @@ akka {
import RemoteRouterSpec._
val conf = ConfigFactory.parseString("""akka.remote.netty.port=12347
val conf = ConfigFactory.parseString("""akka.remoting.transports.tcp.port=12347
akka.actor.deployment {
/remote-override {
router = round-robin
@ -85,13 +85,13 @@ akka.actor.deployment {
val children = replies.toSet
children must have size 2
children.map(_.parent) must have size 1
children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347")
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
system.stop(router)
}
"deploy its children on remote host driven by programatic definition" in {
val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2),
Seq(Address("akka", "remote-sys", "localhost", 12347)))), "blub2")
Seq(Address("tcp.akka", "remote-sys", "localhost", 12347)))), "blub2")
val replies = for (i 1 to 5) yield {
router ! ""
expectMsgType[ActorRef].path
@ -99,7 +99,7 @@ akka.actor.deployment {
val children = replies.toSet
children must have size 2
children.map(_.parent) must have size 1
children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347")
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
system.stop(router)
}
@ -112,13 +112,13 @@ akka.actor.deployment {
val children = replies.toSet
children.size must be >= 2
children.map(_.parent) must have size 1
children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347")
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
system.stop(router)
}
"deploy remote routers based on configuration" in {
val router = system.actorOf(Props[Echo].withRouter(FromConfig), "remote-blub")
router.path.address.toString must be("akka://remote-sys@localhost:12347")
router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347")
val replies = for (i 1 to 5) yield {
router ! ""
expectMsgType[ActorRef].path
@ -128,14 +128,14 @@ akka.actor.deployment {
val parents = children.map(_.parent)
parents must have size 1
parents.head must be(router.path)
children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347")
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
system.stop(router)
}
"deploy remote routers based on explicit deployment" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote-sys@localhost:12347")))), "remote-blub2")
router.path.address.toString must be("akka://remote-sys@localhost:12347")
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "remote-blub2")
router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347")
val replies = for (i 1 to 5) yield {
router ! ""
expectMsgType[ActorRef].path
@ -145,13 +145,13 @@ akka.actor.deployment {
val parents = children.map(_.parent)
parents must have size 1
parents.head must be(router.path)
children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347")
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
system.stop(router)
}
"let remote deployment be overridden by local configuration" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote-sys@localhost:12347")))), "local-blub")
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "local-blub")
router.path.address.toString must be("akka://RemoteRouterSpec")
val replies = for (i 1 to 5) yield {
router ! ""
@ -161,15 +161,15 @@ akka.actor.deployment {
children must have size 2
val parents = children.map(_.parent)
parents must have size 1
parents.head.address must be(Address("akka", "remote-sys", "localhost", 12347))
children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347")
parents.head.address must be(Address("tcp.akka", "remote-sys", "localhost", 12347))
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
system.stop(router)
}
"let remote deployment router be overridden by local configuration" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote-sys@localhost:12347")))), "local-blub2")
router.path.address.toString must be("akka://remote-sys@localhost:12347")
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "local-blub2")
router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347")
val replies = for (i 1 to 5) yield {
router ! ""
expectMsgType[ActorRef].path
@ -179,14 +179,14 @@ akka.actor.deployment {
val parents = children.map(_.parent)
parents must have size 1
parents.head must be(router.path)
children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347")
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
system.stop(router)
}
"let remote deployment be overridden by remote configuration" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote-sys@localhost:12347")))), "remote-override")
router.path.address.toString must be("akka://remote-sys@localhost:12347")
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "remote-override")
router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347")
val replies = for (i 1 to 5) yield {
router ! ""
expectMsgType[ActorRef].path
@ -196,7 +196,7 @@ akka.actor.deployment {
val parents = children.map(_.parent)
parents must have size 1
parents.head must be(router.path)
children foreach (_.address.toString must be === "akka://remote-sys@localhost:12347")
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
system.stop(router)
}
@ -206,7 +206,7 @@ akka.actor.deployment {
}
val router = system.actorOf(Props.empty.withRouter(new RemoteRouterConfig(
RoundRobinRouter(1, supervisorStrategy = escalator),
Seq(Address("akka", "remote-sys", "localhost", 12347)))), "blub3")
Seq(Address("tcp.akka", "remote-sys", "localhost", 12347)))), "blub3")
router ! CurrentRoutees
EventFilter[ActorKilledException](occurrences = 1) intercept {

View file

@ -34,31 +34,6 @@ object RemotingSpec {
}
val cfg: Config = ConfigFactory parseString ("""
common-transport-settings {
log-transport-events = true
connection-timeout = 120s
use-dispatcher-for-io = ""
write-buffer-high-water-mark = 0b
write-buffer-low-water-mark = 0b
send-buffer-size = 32000b
receive-buffer-size = 32000b
backlog = 4096
hostname = localhost
enable-ssl = false
server-socket-worker-pool {
pool-size-min = 2
pool-size-factor = 1.0
pool-size-max = 8
}
client-socket-worker-pool {
pool-size-min = 2
pool-size-factor = 1.0
pool-size-max = 8
}
}
common-ssl-settings {
key-store = "%s"
trust-store = "%s"
@ -76,44 +51,21 @@ object RemotingSpec {
remoting.retry-latch-closed-for = 1 s
remoting.log-remote-lifecycle-events = on
remoting.enabled-transports = [test, tcp, udp, ssl]
remoting.transports = [
{
remoting.transports.tcp.port = 12345
remoting.transports.udp.port = 12345
remoting.transports.ssl.port = 23456
remoting.transports.ssl.ssl = ${common-ssl-settings}
remoting.transports.test {
transport-class = "akka.remote.transport.TestTransport"
settings {
applied-adapters = []
registry-key = aX33k0jWKg
local-address = "test://RemotingSpec@localhost:12345"
maximum-payload-bytes = 32000 bytes
scheme-identifier = test
}
},
{
transport-class = "akka.remote.transport.netty.NettyTransport"
settings = ${common-transport-settings}
settings {
transport-protocol = tcp
port = 12345
}
},
{
transport-class = "akka.remote.transport.netty.NettyTransport"
settings = ${common-transport-settings}
settings {
transport-protocol = udp
port = 12345
}
},
{
transport-class = "akka.remote.transport.netty.NettyTransport"
settings = ${common-transport-settings}
settings {
transport-protocol = tcp
enable-ssl = true
port = 23456
ssl = ${common-ssl-settings}
}
}
]
actor.deployment {
/blub.remote = "test.akka://remote-sys@localhost:12346"
@ -137,44 +89,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
val conf = ConfigFactory.parseString(
"""
akka.remote.netty.port=12346
akka.remoting.transports = [
{
transport-class = "akka.remote.transport.TestTransport"
settings {
registry-key = aX33k0jWKg
local-address = "test://remote-sys@localhost:12346"
maximum-payload-bytes = 32000 bytes
scheme-identifier = test
akka.remoting.transports {
tcp.port = 12346
udp.port = 12346
ssl.port = 23457
test.local-address = "test://remote-sys@localhost:12346"
}
},
{
transport-class = "akka.remote.transport.netty.NettyTransport"
settings = ${common-transport-settings}
settings {
transport-protocol = tcp
port = 12346
}
},
{
transport-class = "akka.remote.transport.netty.NettyTransport"
settings = ${common-transport-settings}
settings {
transport-protocol = udp
port = 12346
}
},
{
transport-class = "akka.remote.transport.netty.NettyTransport"
settings = ${common-transport-settings}
settings {
transport-protocol = tcp
enable-ssl = true
port = 23457
ssl = ${common-ssl-settings}
}
}
]
""").withFallback(system.settings.config).resolve()
val other = ActorSystem("remote-sys", conf)
@ -195,12 +115,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"support remote look-ups" in {
here ! "ping"
expectMsg("pong")
lastSender must be(testActor)
expectMsg(("pong", testActor))
}
"send error message for wrong address" in {
EventFilter.error(start = "AssociationError", occurrences = 1).intercept {
EventFilter.error(start = "AssociationError").intercept {
system.actorFor("test.akka://nonexistingsystem@localhost:12346/user/echo") ! "ping"
}
}

View file

@ -33,7 +33,7 @@ object Configuration {
filter-leeway = 10s
default-timeout = 10s
}
remote.transport = "akka.remote.netty.NettyRemoteTransport"
remote.netty {
hostname = localhost
port = %d

View file

@ -4,38 +4,36 @@ import akka.testkit._
import akka.actor._
import com.typesafe.config._
import scala.concurrent.duration._
import akka.remote.netty.NettyRemoteTransport
import akka.remote.netty.{ SSLSettings, NettyRemoteTransport }
import java.util.ArrayList
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978ConfigSpec extends AkkaSpec("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
hostname = localhost
port = 0
}
}
""") with ImplicitSender with DefaultTimeout {
class Ticket1978ConfigSpec extends AkkaSpec with ImplicitSender with DefaultTimeout {
val cfg = ConfigFactory.parseString("""
ssl-settings {
key-store = "keystore"
trust-store = "truststore"
key-store-password = "changeme"
trust-store-password = "changeme"
protocol = "TLSv1"
random-number-generator = "AES128CounterSecureRNG"
enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA]
sha1prng-random-source = "/dev/./urandom"
}""")
"SSL Remoting" must {
"be able to parse these extra Netty config elements" in {
val settings =
system.asInstanceOf[ExtendedActorSystem]
.provider.asInstanceOf[RemoteActorRefProvider]
.transport.asInstanceOf[NettyRemoteTransport]
.settings
import settings._
val settings = new SSLSettings(cfg.getConfig("ssl-settings"))
EnableSSL must be(false)
SslSettings.SSLKeyStore must be(Some("keystore"))
SslSettings.SSLKeyStorePassword must be(Some("changeme"))
SslSettings.SSLTrustStore must be(Some("truststore"))
SslSettings.SSLTrustStorePassword must be(Some("changeme"))
SslSettings.SSLProtocol must be(Some("TLSv1"))
SslSettings.SSLEnabledAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA"))
SslSettings.SSLRandomSource must be(None)
SslSettings.SSLRandomNumberGenerator must be(None)
settings.SSLKeyStore must be(Some("keystore"))
settings.SSLKeyStorePassword must be(Some("changeme"))
settings.SSLTrustStore must be(Some("truststore"))
settings.SSLTrustStorePassword must be(Some("changeme"))
settings.SSLProtocol must be(Some("TLSv1"))
settings.SSLEnabledAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA"))
settings.SSLRandomSource must be(Some("/dev/./urandom"))
settings.SSLRandomNumberGenerator must be(Some("AES128CounterSecureRNG"))
}
}
}

View file

@ -25,16 +25,15 @@ import akka.actor.PoisonPill
class UntrustedSpec extends AkkaSpec("""
akka.actor.provider = akka.remote.RemoteActorRefProvider
akka.remote.untrusted-mode = on
akka.remote.netty.port = 0
akka.remote.log-remote-lifecycle-events = off
akka.remoting.transports.tcp.port = 0
akka.loglevel = DEBUG
""") with ImplicitSender {
val other = ActorSystem("UntrustedSpec-client", ConfigFactory.parseString("""
akka.actor.provider = akka.remote.RemoteActorRefProvider
akka.remote.netty.port = 0
akka.remoting.transports.tcp.port = 0
"""))
val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress
val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head
val target1 = other.actorFor(RootActorPath(addr) / "remote")
val target2 = other.actorFor(RootActorPath(addr) / testActor.path.elements)

View file

@ -390,7 +390,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
wrappedHandle.readHandlerPromise.success(testActor)
Thread.sleep(100)
Thread.sleep(100) //FIXME: Remove this
reader ! Disassociated

View file

@ -0,0 +1,90 @@
package akka.remote.transport
import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec }
import com.typesafe.config.{ Config, ConfigFactory }
import AkkaProtocolStressTest._
import akka.actor._
import scala.concurrent.duration._
object AkkaProtocolStressTest {
val configA: Config = ConfigFactory parseString ("""
akka {
#loglevel = DEBUG
actor.provider = "akka.remote.RemoteActorRefProvider"
remoting.retry-latch-closed-for = 0 s
remoting.log-remote-lifecycle-events = on
remoting.failure-detector {
threshold = 1.0
max-sample-size = 2
min-std-deviation = 1 ms
acceptable-heartbeat-pause = 0.01 s
}
remoting.retry-window = 1 s
remoting.maximum-retries-in-window = 1000
remoting.transports.tcp {
applied-adapters = ["gremlin"]
port = 12345
}
}
""")
class SequenceVerifier(remote: ActorRef, controller: ActorRef) extends Actor {
val limit = 10000
var nextSeq = 0
var maxSeq = -1
var losses = 0
def receive = {
case "start" self ! "sendNext"
case "sendNext" if (nextSeq < limit) {
remote ! nextSeq
nextSeq += 1
self ! "sendNext"
}
case seq: Int
if (seq > maxSeq) {
losses += seq - maxSeq - 1
maxSeq = seq
if (seq > limit * 0.9) {
controller ! (maxSeq, losses)
}
} else {
controller ! "Received out of order message. Previous: ${maxSeq} Received: ${seq}"
}
}
}
}
class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {
val configB = ConfigFactory.parseString("akka.remoting.transports.tcp.port = 12346")
.withFallback(system.settings.config).resolve()
val systemB = ActorSystem("systemB", configB)
val remote = systemB.actorOf(Props(new Actor {
def receive = {
case seq: Int sender ! seq
}
}), "echo")
val here = system.actorFor("tcp.gremlin.akka://systemB@localhost:12346/user/echo")
"AkkaProtocolTransport" must {
"guarantee at-most-once delivery and message ordering despite packet loss" taggedAs TimingTest in {
val tester = system.actorOf(Props(new SequenceVerifier(here, self))) ! "start"
expectMsgPF(30 seconds) {
case (received: Int, lost: Int)
log.warning(s" ######## Received ${received - lost} messages from ${received} ########")
}
}
}
override def atTermination(): Unit = systemB.shutdown()
}

View file

@ -0,0 +1,167 @@
package akka.remote.transport
import akka.actor.{ ExtendedActorSystem, Address }
import akka.remote.transport.AssociationHandle.Disassociated
import akka.remote.transport.AssociationHandle.InboundPayload
import akka.remote.transport.TestTransport._
import akka.remote.transport.Transport.Fail
import akka.remote.transport.Transport.InboundAssociation
import akka.remote.transport.Transport.Ready
import akka.remote.transport.Transport.Status
import akka.testkit.{ ImplicitSender, DefaultTimeout, AkkaSpec }
import akka.util.ByteString
import scala.concurrent.{ Future, Await }
import akka.remote.RemoteActorRefProvider
abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
extends AkkaSpec("""akka.actor.provider = "akka.remote.RemoteActorRefProvider" """)
with DefaultTimeout with ImplicitSender {
def transportName: String
def schemeIdentifier: String
val addressATest: Address = Address("test", "testsytemA", "testhostA", 4321)
val addressBTest: Address = Address("test", "testsytemB", "testhostB", 5432)
val addressA: Address = addressATest.copy(protocol = s"${addressATest.protocol}.$schemeIdentifier")
val addressB: Address = addressBTest.copy(protocol = s"${addressBTest.protocol}.$schemeIdentifier")
val nonExistingAddress = Address("test." + schemeIdentifier, "nosystem", "nohost", 0)
def freshTransport(testTransport: TestTransport): Transport
def wrapTransport(transport: Transport): Transport = if (withAkkaProtocol) {
val provider = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider]
new AkkaProtocolTransport(transport, system, new AkkaProtocolSettings(provider.remoteSettings.config), AkkaPduProtobufCodec)
} else transport
def newTransportA(registry: AssociationRegistry): Transport =
wrapTransport(freshTransport(new TestTransport(addressATest, registry)))
def newTransportB(registry: AssociationRegistry): Transport =
wrapTransport(freshTransport(new TestTransport(addressBTest, registry)))
transportName must {
"return an Address and promise when listen is called" in {
val registry = new AssociationRegistry
val transportA = newTransportA(registry)
val result = Await.result(transportA.listen, timeout.duration)
result._1 must be(addressA)
result._2 must not be null
registry.logSnapshot.exists {
case ListenAttempt(address) address == addressATest
case _ false
} must be(true)
}
"associate successfully with another transport of its kind" in {
val registry = new AssociationRegistry
val transportA = newTransportA(registry)
val transportB = newTransportB(registry)
// Must complete the returned promise to receive events
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
awaitCond(registry.transportsReady(addressATest, addressBTest))
transportA.associate(addressB)
expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
case InboundAssociation(handle) if handle.remoteAddress == addressA
}
registry.logSnapshot.contains(AssociateAttempt(addressATest, addressBTest)) must be(true)
awaitCond(registry.existsAssociation(addressATest, addressBTest))
}
"fail to associate with nonexisting address" in {
val registry = new AssociationRegistry
val transportA = newTransportA(registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
awaitCond(registry.transportsReady(addressATest))
Await.result(transportA.associate(nonExistingAddress), timeout.duration) match {
case Fail(_)
case _ fail()
}
}
"successfully send PDUs" in {
val registry = new AssociationRegistry
val transportA = newTransportA(registry)
val transportB = newTransportB(registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
awaitCond(registry.transportsReady(addressATest, addressBTest))
val associate: Future[Status] = transportA.associate(addressB)
val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
case InboundAssociation(handle) if handle.remoteAddress == addressA handle
}
val Ready(handleA) = Await.result(associate, timeout.duration)
// Initialize handles
handleA.readHandlerPromise.success(self)
handleB.readHandlerPromise.success(self)
val payload = ByteString("PDU")
val pdu = if (withAkkaProtocol) AkkaPduProtobufCodec.constructPayload(payload) else payload
awaitCond(registry.existsAssociation(addressATest, addressBTest))
handleA.write(payload)
expectMsgPF(timeout.duration, "Expect InboundPayload from A") {
case InboundPayload(p) if payload == p
}
registry.logSnapshot.exists {
case WriteAttempt(sender, recipient, sentPdu)
sender == addressATest && recipient == addressBTest && sentPdu == pdu
case _ false
} must be(true)
}
"successfully disassociate" in {
val registry = new AssociationRegistry
val transportA = newTransportA(registry)
val transportB = newTransportB(registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
awaitCond(registry.transportsReady(addressATest, addressBTest))
val associate: Future[Status] = transportA.associate(addressB)
val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
case InboundAssociation(handle) if handle.remoteAddress == addressA handle
}
val Ready(handleA) = Await.result(associate, timeout.duration)
// Initialize handles
handleA.readHandlerPromise.success(self)
handleB.readHandlerPromise.success(self)
awaitCond(registry.existsAssociation(addressATest, addressBTest))
handleA.disassociate()
expectMsgPF(timeout.duration) {
case Disassociated
}
awaitCond(!registry.existsAssociation(addressATest, addressBTest))
registry.logSnapshot exists {
case DisassociateAttempt(requester, remote) if requester == addressATest && remote == addressBTest true
case _ false
} must be(true)
}
}
}

View file

@ -10,15 +10,15 @@ import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload }
class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
val addressA: Address = Address("akka", "testsytemA", "testhostA", 4321)
val addressB: Address = Address("akka", "testsytemB", "testhostB", 5432)
val nonExistingAddress = Address("akka", "nosystem", "nohost", 0)
val addressA: Address = Address("test", "testsytemA", "testhostA", 4321)
val addressB: Address = Address("test", "testsytemB", "testhostB", 5432)
val nonExistingAddress = Address("test", "nosystem", "nohost", 0)
"TestTransport" must {
"return an Address and promise when listen is called and log calls" in {
val registry = new AssociationRegistry
var transportA = new TestTransport(addressA, registry)
val transportA = new TestTransport(addressA, registry)
val result = Await.result(transportA.listen, timeout.duration)
@ -33,14 +33,14 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
"associate successfully with another TestTransport and log" in {
val registry = new AssociationRegistry
var transportA = new TestTransport(addressA, registry)
var transportB = new TestTransport(addressB, registry)
val transportA = new TestTransport(addressA, registry)
val transportB = new TestTransport(addressB, registry)
// Must complete the returned promise to receive events
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
awaitCond(registry.transportsReady(transportA, transportB))
awaitCond(registry.transportsReady(addressA, addressB))
transportA.associate(addressB)
expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
@ -63,13 +63,13 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
"emulate sending PDUs and logs write" in {
val registry = new AssociationRegistry
var transportA = new TestTransport(addressA, registry)
var transportB = new TestTransport(addressB, registry)
val transportA = new TestTransport(addressA, registry)
val transportB = new TestTransport(addressB, registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
awaitCond(registry.transportsReady(transportA, transportB))
awaitCond(registry.transportsReady(addressA, addressB))
val associate: Future[Status] = transportA.associate(addressB)
val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
@ -100,13 +100,13 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
"emulate disassociation and log it" in {
val registry = new AssociationRegistry
var transportA = new TestTransport(addressA, registry)
var transportB = new TestTransport(addressB, registry)
val transportA = new TestTransport(addressA, registry)
val transportB = new TestTransport(addressB, registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
awaitCond(registry.transportsReady(transportA, transportB))
awaitCond(registry.transportsReady(addressA, addressB))
val associate: Future[Status] = transportA.associate(addressB)
val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {

View file

@ -0,0 +1,99 @@
package akka.remote.transport
import akka.testkit.AkkaSpec
import akka.remote.transport.ThrottlerTransportAdapter.{ TokenBucket, Unthrottled }
import java.util.concurrent.TimeUnit
class ThrottleModeSpec extends AkkaSpec {
"ThrottleMode" must {
"allow consumption of infinite amount of tokens when untrhottled" in {
val bucket = Unthrottled
bucket.tryConsumeTokens(0, 100) must be((Unthrottled, true))
bucket.tryConsumeTokens(100000, 1000) must be((Unthrottled, true))
bucket.tryConsumeTokens(1000000, 10000) must be((Unthrottled, true))
}
"in tokenbucket mode allow consuming tokens up to capacity" in {
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 100)
val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10)
bucket1 must be(TokenBucket(100, 100, 0, 90))
success1 must be(true)
val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = 0L, 40)
bucket2 must be(TokenBucket(100, 100, 0, 50))
success2 must be(true)
val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 0L, 50)
bucket3 must be(TokenBucket(100, 100, 0, 0))
success3 must be(true)
val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 0, 1)
bucket4 must be(TokenBucket(100, 100, 0, 0))
success4 must be(false)
}
"accurately replenish tokens" in {
val halfSecond: Long = TimeUnit.MILLISECONDS.toNanos(500)
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 0)
val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 0)
bucket1 must be(TokenBucket(100, 100, 0, 0))
success1 must be(true)
val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 0)
bucket2 must be(TokenBucket(100, 100, halfSecond, 50))
success2 must be(true)
val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 0)
bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 100))
success3 must be(true)
val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 0)
bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 100))
success4 must be(true)
}
"accurately interleave replenish and consume" in {
val halfSecond: Long = TimeUnit.MILLISECONDS.toNanos(500)
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20)
val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10)
bucket1 must be(TokenBucket(100, 100, 0, 10))
success1 must be(true)
val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 60)
bucket2 must be(TokenBucket(100, 100, halfSecond, 0))
success2 must be(true)
val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 40)
bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 10))
success3 must be(true)
val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 70)
bucket4 must be(TokenBucket(100, 100, 2 * halfSecond, 10))
success4 must be(false)
}
"allow oversized packets through by loaning" in {
val halfSecond: Long = TimeUnit.MILLISECONDS.toNanos(500)
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20)
val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 30)
bucket1 must be(TokenBucket(100, 100, 0, 20))
success1 must be(false)
val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 110)
bucket2 must be(TokenBucket(100, 100, halfSecond, -40))
success2 must be(true)
val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 20)
bucket3 must be(TokenBucket(100, 100, halfSecond, -40))
success3 must be(false)
val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 20)
bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 40))
success4 must be(true)
}
}
}

View file

@ -0,0 +1,94 @@
package akka.remote.transport
import com.typesafe.config.{ ConfigFactory, Config }
import akka.actor._
import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec }
import ThrottlerTransportAdapterSpec._
import scala.concurrent.duration._
import akka.remote.transport.TestTransport.{ WriteAttempt, AssociationRegistry }
import scala.concurrent.{ Promise, Future, Await }
import akka.remote.transport.Transport.{ Ready, InboundAssociation, Status }
import akka.util.ByteString
import akka.remote.transport.AssociationHandle.InboundPayload
import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, TokenBucket, SetThrottle }
import akka.remote.RemoteActorRefProvider
object ThrottlerTransportAdapterSpec {
val configA: Config = ConfigFactory parseString ("""
akka {
#loglevel = DEBUG
actor.provider = "akka.remote.RemoteActorRefProvider"
remoting.retry-latch-closed-for = 0 s
remoting.log-remote-lifecycle-events = on
remoting.transports.tcp.applied-adapters = ["trttl"]
remoting.transports.tcp.port = 12345
}
""")
class Echo extends Actor {
override def receive = {
case "ping" sender ! "pong"
}
}
val PingPacketSize = 148
val MessageCount = 100
val BytesPerSecond = 500
val TotalTime = (MessageCount * PingPacketSize) / BytesPerSecond
class ThrottlingTester(remote: ActorRef, controller: ActorRef) extends Actor {
var messageCount = MessageCount
var received = 0
var startTime = 0L
override def receive = {
case "start"
self ! "sendNext"
startTime = System.nanoTime()
case "sendNext" if (messageCount > 0) {
remote ! "ping"
self ! "sendNext"
messageCount -= 1
}
case "pong"
received += 1
if (received >= MessageCount) controller ! (System.nanoTime() - startTime)
}
}
}
class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {
val configB = ConfigFactory.parseString("akka.remoting.transports.tcp.port = 12346")
.withFallback(system.settings.config).resolve()
val systemB = ActorSystem("systemB", configB)
val remote = systemB.actorOf(Props[Echo], "echo")
val here = system.actorFor("tcp.trttl.akka://systemB@localhost:12346/user/echo")
"ThrottlerTransportAdapter" must {
"maintain average message rate" taggedAs TimingTest in {
Await.result(
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
.managementCommand(SetThrottle(Address("akka", "systemB", "localhost", 12346), Direction.Send, TokenBucket(200, 500, 0, 0))), 3 seconds)
val tester = system.actorOf(Props(new ThrottlingTester(here, self))) ! "start"
expectMsgPF((TotalTime + 3) seconds) {
case time: Long log.warning("Total time of transmission: " + NANOSECONDS.toSeconds(time))
}
}
}
override def atTermination(): Unit = systemB.shutdown()
}
class ThrottlerTransportAdapterGenericSpec extends GenericTransportSpec(withAkkaProtocol = true) {
def transportName = "ThrottlerTransportAdapter"
def schemeIdentifier = "trttl.akka"
def freshTransport(testTransport: TestTransport) =
new ThrottlerTransportAdapter(testTransport, system.asInstanceOf[ExtendedActorSystem])
}