Fixes and additions to the ThrottlerTransportAdapter. See #2930

This commit is contained in:
Björn Antonsson 2013-03-20 10:18:24 +01:00
parent 83541e8abf
commit f18575e251
3 changed files with 86 additions and 16 deletions

View file

@ -227,7 +227,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
val cmdFuture = TestConductor().transport.managementCommand(SetThrottle(t.target, t.direction, mode))
cmdFuture onSuccess {
case b: Boolean self ! ToServer(Done)
case true self ! ToServer(Done)
case _ throw new RuntimeException("Throttle was requested from the TestConductor, but no transport " +
"adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig")
}

View file

@ -142,6 +142,20 @@ object ThrottlerTransportAdapter {
*/
def getInstance = this
}
/**
* Management Command to force dissocation of an address.
*/
@SerialVersionUID(1L)
case class ForceDissociate(address: Address)
@SerialVersionUID(1L)
case object ForceDissociateAck {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
}
class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedActorSystem)
@ -155,12 +169,16 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
Props(new ThrottlerManager(wt))
}
override def managementCommand(cmd: Any): Future[Boolean] = cmd match {
case s: SetThrottle
override def managementCommand(cmd: Any): Future[Boolean] = {
import ActorTransportAdapter.AskTimeout
cmd match {
case s: SetThrottle
manager ? s map { case SetThrottleAck true }
case f: ForceDissociate
manager ? f map { case ForceDissociateAck true }
case _ wrappedTransport.managementCommand(cmd)
}
}
}
/**
@ -212,8 +230,14 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
case (`naked`, handle) setMode(handle, mode, direction)
case _ ok
}
Future.sequence(allAcks).map(_ SetThrottleAck) pipeTo sender
case ForceDissociate(address)
val naked = nakedAddress(address)
handleTable.foreach {
case (`naked`, handle) handle.disassociate()
case _
}
sender ! ForceDissociateAck
case Checkin(origin, handle)
val naked: Address = nakedAddress(origin)
@ -247,7 +271,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
import ActorTransportAdapter.AskTimeout
if (direction.includes(Direction.Send))
handle.outboundThrottleMode.set(mode)
if (direction.includes(Direction.Receive))
if (direction.includes(Direction.Receive) && !handle.throttlerActor.isTerminated)
(handle.throttlerActor ? mode).mapTo[SetThrottleAck.type]
else
Future.successful(SetThrottleAck)

View file

@ -6,7 +6,7 @@ import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec }
import ThrottlerTransportAdapterSpec._
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, TokenBucket, SetThrottle }
import akka.remote.transport.ThrottlerTransportAdapter._
import akka.remote.RemoteActorRefProvider
import akka.testkit.TestEvent
import akka.testkit.EventFilter
@ -18,6 +18,7 @@ object ThrottlerTransportAdapterSpec {
#loglevel = DEBUG
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.tcp.hostname = "localhost"
remote.retry-latch-closed-for = 0 s
remote.log-remote-lifecycle-events = on
@ -29,13 +30,14 @@ object ThrottlerTransportAdapterSpec {
class Echo extends Actor {
override def receive = {
case "ping" sender ! "pong"
case x sender ! x
}
}
val PingPacketSize = 148
val MessageCount = 100
val MessageCount = 30
val BytesPerSecond = 500
val TotalTime = (MessageCount * PingPacketSize) / BytesPerSecond
val TotalTime: Long = (MessageCount * PingPacketSize) / BytesPerSecond
class ThrottlingTester(remote: ActorRef, controller: ActorRef) extends Actor {
var messageCount = MessageCount
@ -66,17 +68,61 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende
val rootB = RootActorPath(systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress)
val here = system.actorFor(rootB / "user" / "echo")
def throttle(direction: Direction, mode: ThrottleMode): Boolean = {
val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get)
val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
Await.result(transport.managementCommand(SetThrottle(rootBAddress, direction, mode)), 3.seconds)
}
def dissociate(): Boolean = {
val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get)
val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
Await.result(transport.managementCommand(ForceDissociate(rootBAddress)), 3.seconds)
}
"ThrottlerTransportAdapter" must {
"maintain average message rate" taggedAs TimingTest in {
Await.result(
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
.managementCommand(SetThrottle(Address("akka", "systemB", "localhost", rootB.address.port.get), Direction.Send, TokenBucket(200, 500, 0, 0))), 3.seconds)
throttle(Direction.Send, TokenBucket(200, 500, 0, 0)) must be(true)
val tester = system.actorOf(Props(new ThrottlingTester(here, self))) ! "start"
expectMsgPF((TotalTime + 3).seconds) {
case time: Long log.warning("Total time of transmission: " + NANOSECONDS.toSeconds(time))
val time = NANOSECONDS.toSeconds(expectMsgType[Long]((TotalTime + 3).seconds))
log.warning("Total time of transmission: " + time)
time must be > (TotalTime - 3)
throttle(Direction.Send, Unthrottled) must be(true)
}
"must survive blackholing" taggedAs TimingTest in {
here ! "Blackhole 1"
expectMsg("Blackhole 1")
throttle(Direction.Both, Blackhole) must be(true)
here ! "Blackhole 2"
expectNoMsg(1.seconds)
dissociate() must be(true)
expectNoMsg(1.seconds)
throttle(Direction.Both, Unthrottled) must be(true)
// after we remove the Blackhole we can't be certain of the state
// of the connection, repeat until success
here ! "Blackhole 3"
awaitCond({
if (receiveOne(Duration.Zero) == "Blackhole 3")
true
else {
here ! "Blackhole 3"
false
}
}, 5.seconds)
here ! "Cleanup"
fishForMessage(5.seconds) {
case "Cleanup" true
case "Blackhole 3" false
}
}
}
override def beforeTermination() {