From f18575e25166c86b3549d1b7a60b0ea99fafbfbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Wed, 20 Mar 2013 10:18:24 +0100 Subject: [PATCH] Fixes and additions to the ThrottlerTransportAdapter. See #2930 --- .../akka/remote/testconductor/Player.scala | 2 +- .../transport/ThrottlerTransportAdapter.scala | 38 +++++++++--- .../ThrottlerTransportAdapterSpec.scala | 62 ++++++++++++++++--- 3 files changed, 86 insertions(+), 16 deletions(-) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index 0ed0130c1e..384d8cad38 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -227,7 +227,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) val cmdFuture = TestConductor().transport.managementCommand(SetThrottle(t.target, t.direction, mode)) cmdFuture onSuccess { - case b: Boolean ⇒ self ! ToServer(Done) + case true ⇒ self ! ToServer(Done) case _ ⇒ throw new RuntimeException("Throttle was requested from the TestConductor, but no transport " + "adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig") } diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index c7a0d7991f..2726d71191 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -142,6 +142,20 @@ object ThrottlerTransportAdapter { */ def getInstance = this } + + /** + * Management Command to force dissocation of an address. + */ + @SerialVersionUID(1L) + case class ForceDissociate(address: Address) + + @SerialVersionUID(1L) + case object ForceDissociateAck { + /** + * Java API: get the singleton instance + */ + def getInstance = this + } } class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedActorSystem) @@ -155,11 +169,15 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA Props(new ThrottlerManager(wt)) } - override def managementCommand(cmd: Any): Future[Boolean] = cmd match { - case s: SetThrottle ⇒ - import ActorTransportAdapter.AskTimeout - manager ? s map { case SetThrottleAck ⇒ true } - case _ ⇒ wrappedTransport.managementCommand(cmd) + override def managementCommand(cmd: Any): Future[Boolean] = { + import ActorTransportAdapter.AskTimeout + cmd match { + case s: SetThrottle ⇒ + manager ? s map { case SetThrottleAck ⇒ true } + case f: ForceDissociate ⇒ + manager ? f map { case ForceDissociateAck ⇒ true } + case _ ⇒ wrappedTransport.managementCommand(cmd) + } } } @@ -212,8 +230,14 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A case (`naked`, handle) ⇒ setMode(handle, mode, direction) case _ ⇒ ok } - Future.sequence(allAcks).map(_ ⇒ SetThrottleAck) pipeTo sender + case ForceDissociate(address) ⇒ + val naked = nakedAddress(address) + handleTable.foreach { + case (`naked`, handle) ⇒ handle.disassociate() + case _ ⇒ + } + sender ! ForceDissociateAck case Checkin(origin, handle) ⇒ val naked: Address = nakedAddress(origin) @@ -247,7 +271,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A import ActorTransportAdapter.AskTimeout if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode) - if (direction.includes(Direction.Receive)) + if (direction.includes(Direction.Receive) && !handle.throttlerActor.isTerminated) (handle.throttlerActor ? mode).mapTo[SetThrottleAck.type] else Future.successful(SetThrottleAck) diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala index f7f364d1be..9f31b5a271 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -6,7 +6,7 @@ import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec } import ThrottlerTransportAdapterSpec._ import scala.concurrent.duration._ import scala.concurrent.Await -import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, TokenBucket, SetThrottle } +import akka.remote.transport.ThrottlerTransportAdapter._ import akka.remote.RemoteActorRefProvider import akka.testkit.TestEvent import akka.testkit.EventFilter @@ -18,6 +18,7 @@ object ThrottlerTransportAdapterSpec { #loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty.tcp.hostname = "localhost" remote.retry-latch-closed-for = 0 s remote.log-remote-lifecycle-events = on @@ -29,13 +30,14 @@ object ThrottlerTransportAdapterSpec { class Echo extends Actor { override def receive = { case "ping" ⇒ sender ! "pong" + case x ⇒ sender ! x } } val PingPacketSize = 148 - val MessageCount = 100 + val MessageCount = 30 val BytesPerSecond = 500 - val TotalTime = (MessageCount * PingPacketSize) / BytesPerSecond + val TotalTime: Long = (MessageCount * PingPacketSize) / BytesPerSecond class ThrottlingTester(remote: ActorRef, controller: ActorRef) extends Actor { var messageCount = MessageCount @@ -66,17 +68,61 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende val rootB = RootActorPath(systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress) val here = system.actorFor(rootB / "user" / "echo") + def throttle(direction: Direction, mode: ThrottleMode): Boolean = { + val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get) + val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport + Await.result(transport.managementCommand(SetThrottle(rootBAddress, direction, mode)), 3.seconds) + } + + def dissociate(): Boolean = { + val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get) + val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport + Await.result(transport.managementCommand(ForceDissociate(rootBAddress)), 3.seconds) + } + "ThrottlerTransportAdapter" must { "maintain average message rate" taggedAs TimingTest in { - Await.result( - system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport - .managementCommand(SetThrottle(Address("akka", "systemB", "localhost", rootB.address.port.get), Direction.Send, TokenBucket(200, 500, 0, 0))), 3.seconds) + throttle(Direction.Send, TokenBucket(200, 500, 0, 0)) must be(true) val tester = system.actorOf(Props(new ThrottlingTester(here, self))) ! "start" - expectMsgPF((TotalTime + 3).seconds) { - case time: Long ⇒ log.warning("Total time of transmission: " + NANOSECONDS.toSeconds(time)) + val time = NANOSECONDS.toSeconds(expectMsgType[Long]((TotalTime + 3).seconds)) + log.warning("Total time of transmission: " + time) + time must be > (TotalTime - 3) + throttle(Direction.Send, Unthrottled) must be(true) + } + + "must survive blackholing" taggedAs TimingTest in { + here ! "Blackhole 1" + expectMsg("Blackhole 1") + + throttle(Direction.Both, Blackhole) must be(true) + + here ! "Blackhole 2" + expectNoMsg(1.seconds) + dissociate() must be(true) + expectNoMsg(1.seconds) + + throttle(Direction.Both, Unthrottled) must be(true) + + // after we remove the Blackhole we can't be certain of the state + // of the connection, repeat until success + here ! "Blackhole 3" + awaitCond({ + if (receiveOne(Duration.Zero) == "Blackhole 3") + true + else { + here ! "Blackhole 3" + false + } + }, 5.seconds) + + here ! "Cleanup" + fishForMessage(5.seconds) { + case "Cleanup" ⇒ true + case "Blackhole 3" ⇒ false } } + } override def beforeTermination() {