Clean shutdown of ReliableProxySpec, see #2846

* Solved by adding missing Dequeue in throttler
* Changed to FSM timers in throttler
* Some boy scouting
This commit is contained in:
Patrik Nordwall 2013-01-03 14:51:41 +01:00
parent 3ceb02f846
commit 48c6374721
7 changed files with 105 additions and 103 deletions

View file

@ -37,8 +37,9 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
override def afterEach {
runOn(local) {
testConductor.throttle(local, remote, Direction.Both, -1).await
testConductor.passThrough(local, remote, Direction.Both).await
}
enterBarrier("after-each")
}
@volatile var target: ActorRef = system.deadLetters
@ -46,8 +47,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s))
def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2))
def sendN(n: Int) = (1 to n) foreach (proxy ! _)
def sendN(n: Int) = (1 to n) foreach (proxy ! _)
def expectN(n: Int) = (1 to n) foreach { n expectMsg(n); lastSender must be === target }
"A ReliableProxy" must {
@ -82,6 +83,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
runOn(remote) {
expectMsg("hello")
}
enterBarrier("initialize-done")
}
"forward messages in sequence" in {
@ -95,9 +98,9 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(100)
}
}
enterBarrier("test1a")
runOn(local) {
sendN(100)
expectTransition(Idle, Active)
@ -108,7 +111,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(100)
}
}
enterBarrier("test1b")
}
@ -121,17 +124,17 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectNoMsg
}
}
enterBarrier("test2a")
runOn(remote) {
expectNoMsg(0 seconds)
}
enterBarrier("test2b")
runOn(local) {
testConductor.throttle(local, remote, Direction.Send, -1).await
testConductor.passThrough(local, remote, Direction.Send).await
within(5 seconds) { expectTransition(Active, Idle) }
}
runOn(remote) {
@ -139,7 +142,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(100)
}
}
enterBarrier("test2c")
}
@ -157,14 +160,14 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(100)
}
}
enterBarrier("test3a")
runOn(local) {
testConductor.throttle(local, remote, Direction.Receive, -1).await
testConductor.passThrough(local, remote, Direction.Receive).await
within(5 seconds) { expectTransition(Active, Idle) }
}
enterBarrier("test3b")
}
@ -182,11 +185,11 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(50)
}
}
enterBarrier("test4a")
runOn(local) {
testConductor.throttle(local, remote, Direction.Send, rateMBit = -1).await
testConductor.passThrough(local, remote, Direction.Send).await
testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.1).await
sendN(50)
within(5 seconds) {
@ -199,8 +202,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(50)
}
}
enterBarrier("test4a")
enterBarrier("test4b")
}
}

View file

@ -207,8 +207,8 @@ surprising ways.
* Don't issue a shutdown of the first node. The first node is the controller and if it shuts down your test will break.
* To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the ``TestConductorTranport``
by specifying ``testTransport(on = true)`` in your MultiNodeConfig.
* To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the failure injector and
throttler transport adapters by specifying ``testTransport(on = true)`` in your MultiNodeConfig.
* Throttling, shutdown and other failure injections can only be done from the first node, which again is the controller.

View file

@ -94,8 +94,8 @@ trait Conductor { this: TestConductorExt ⇒
* increased latency.
*
* ====Note====
* To use this feature you must activate the `TestConductorTranport`
* by specifying `testTransport(on = true)` in your MultiNodeConfig.
* To use this feature you must activate the failure injector and throttler
* transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be throttled
@ -115,18 +115,15 @@ trait Conductor { this: TestConductorExt ⇒
* Socket.
*
* ====Note====
* To use this feature you must activate the `TestConductorTranport`
* by specifying `testTransport(on = true)` in your MultiNodeConfig.
* To use this feature you must activate the failure injector and throttler
* transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
*/
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
import Settings.QueryTimeout
requireTestConductorTranport()
controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done]
}
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] =
throttle(node, target, direction, 0f)
private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl."))
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
@ -137,18 +134,15 @@ trait Conductor { this: TestConductorExt ⇒
* sending and/or receiving.
*
* ====Note====
* To use this feature you must activate the `TestConductorTranport`
* by specifying `testTransport(on = true)` in your MultiNodeConfig.
* To use this feature you must activate the failure injector and throttler
* transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
*/
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
import Settings.QueryTimeout
requireTestConductorTranport()
controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done]
}
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] =
throttle(node, target, direction, -1f)
/**
* Tell the remote support to shutdown the connection to the given remote

View file

@ -36,8 +36,8 @@ object TestConductor extends ExtensionKey[TestConductorExt] {
* to be a [[akka.remote.RemoteActorRefProvider]].
*
* To use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the
* `TestConductorTranport` by specifying `testTransport(on = true)` in your
* MultiNodeConfig.
* failure injector and throttler transport adapters by specifying `testTransport(on = true)`
* in your MultiNodeConfig.
*
*/
class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player {

View file

@ -87,8 +87,8 @@ abstract class MultiNodeConfig {
/**
* To be able to use `blackhole`, `passThrough`, and `throttle` you must
* activate the TestConductorTranport by specifying
* `testTransport(on = true)` in your MultiNodeConfig.
* activate the failure injector and throttler transport adapters by
* specifying `testTransport(on = true)` in your MultiNodeConfig.
*/
def testTransport(on: Boolean): Unit = _testTransport = on

View file

@ -59,43 +59,42 @@ object ThrottlerTransportAdapter {
case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode)
sealed trait ThrottleMode {
def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean)
def timeToAvailable(currentTime: Long, tokens: Int): Long
def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean)
def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration
}
case class TokenBucket(capacity: Int, tokensPerSecond: Double, lastSend: Long, availableTokens: Int)
case class TokenBucket(capacity: Int, tokensPerSecond: Double, nanoTimeOfLastSend: Long, availableTokens: Int)
extends ThrottleMode {
private def isAvailable(timeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) {
private def isAvailable(nanoTimeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) {
true // Allow messages larger than capacity through, it will be recorded as negative tokens
} else min((availableTokens + tokensGenerated(timeOfSend)), capacity) >= tokens
} else min((availableTokens + tokensGenerated(nanoTimeOfSend)), capacity) >= tokens
override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = {
if (isAvailable(timeOfSend, tokens))
override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = {
if (isAvailable(nanoTimeOfSend, tokens))
(this.copy(
lastSend = timeOfSend,
availableTokens = min(availableTokens - tokens + tokensGenerated(timeOfSend), capacity)), true)
nanoTimeOfLastSend = nanoTimeOfSend,
availableTokens = min(availableTokens - tokens + tokensGenerated(nanoTimeOfSend), capacity)), true)
else (this, false)
}
override def timeToAvailable(currentTime: Long, tokens: Int): Long = {
val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentTime)
TimeUnit.SECONDS.toNanos((needed / tokensPerSecond).toLong)
override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = {
val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentNanoTime)
(needed / tokensPerSecond).seconds
}
private def tokensGenerated(timeOfSend: Long): Int =
(TimeUnit.NANOSECONDS.toMillis(timeOfSend - lastSend) * tokensPerSecond / 1000.0).toInt
private def tokensGenerated(nanoTimeOfSend: Long): Int =
(TimeUnit.NANOSECONDS.toMillis(nanoTimeOfSend - nanoTimeOfLastSend) * tokensPerSecond / 1000.0).toInt
}
case object Unthrottled extends ThrottleMode {
override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true)
override def timeToAvailable(currentTime: Long, tokens: Int): Long = 1L
override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true)
override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero
}
case object Blackhole extends ThrottleMode {
override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false)
override def timeToAvailable(currentTime: Long, tokens: Int): Long = 0L
override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false)
override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero
}
}
@ -215,6 +214,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
}
object ThrottledAssociation {
private final val DequeueTimerName = "dequeue"
case object Dequeue
sealed trait ThrottlerState
@ -253,7 +254,7 @@ private[transport] class ThrottledAssociation(
import context.dispatcher
var inboundThrottleMode: ThrottleMode = _
var queue = Queue.empty[ByteString]
var throttledMessages = Queue.empty[ByteString]
var upstreamListener: HandleEventListener = _
override def postStop(): Unit = originalHandle.disassociate()
@ -272,7 +273,7 @@ private[transport] class ThrottledAssociation(
when(WaitOrigin) {
case Event(InboundPayload(p), ExposedHandle(exposedHandle))
queue = queue enqueue p
throttledMessages = throttledMessages enqueue p
peekOrigin(p) match {
case Some(origin)
manager ! Checkin(origin, exposedHandle)
@ -283,12 +284,12 @@ private[transport] class ThrottledAssociation(
when(WaitMode) {
case Event(InboundPayload(p), _)
queue = queue enqueue p
throttledMessages = throttledMessages enqueue p
stay()
case Event(mode: ThrottleMode, ExposedHandle(exposedHandle))
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) {
queue = Queue.empty[ByteString]
throttledMessages = Queue.empty[ByteString]
exposedHandle.disassociate()
stop()
} else {
@ -300,7 +301,7 @@ private[transport] class ThrottledAssociation(
when(WaitUpstreamListener) {
case Event(InboundPayload(p), _)
queue = queue enqueue p
throttledMessages = throttledMessages enqueue p
stay()
case Event(listener: HandleEventListener, _)
upstreamListener = listener
@ -315,30 +316,30 @@ private[transport] class ThrottledAssociation(
self ! Dequeue
goto(Throttling)
case Event(InboundPayload(p), _)
queue = queue enqueue p
throttledMessages = throttledMessages enqueue p
stay()
}
when(Throttling) {
case Event(mode: ThrottleMode, _)
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) queue = Queue.empty[ByteString]
if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString]
cancelTimer(DequeueTimerName)
if (throttledMessages.nonEmpty)
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
stay()
case Event(InboundPayload(p), _)
forwardOrDelay(p)
stay()
case Event(Dequeue, _)
if (!queue.isEmpty) {
val (payload, newqueue) = queue.dequeue
if (throttledMessages.nonEmpty) {
val (payload, newqueue) = throttledMessages.dequeue
upstreamListener notify InboundPayload(payload)
queue = newqueue
throttledMessages = newqueue
inboundThrottleMode = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), payload.length)._1
if (inboundThrottleMode == Unthrottled && !queue.isEmpty) self ! Dequeue
else if (!queue.isEmpty) {
context.system.scheduler.scheduleOnce(
inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length).nanos, self, Dequeue)
}
if (throttledMessages.nonEmpty)
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
}
stay()
@ -370,24 +371,28 @@ private[transport] class ThrottledAssociation(
if (inboundThrottleMode == Blackhole) {
// Do nothing
} else {
if (queue.isEmpty) {
if (throttledMessages.isEmpty) {
val tokens = payload.length
val (newbucket, success) = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), tokens)
if (success) {
inboundThrottleMode = newbucket
upstreamListener notify InboundPayload(payload)
} else {
queue = queue.enqueue(payload)
context.system.scheduler.scheduleOnce(
inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens).nanos, self, Dequeue)
throttledMessages = throttledMessages.enqueue(payload)
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens))
}
} else {
queue = queue.enqueue(payload)
throttledMessages = throttledMessages.enqueue(payload)
}
}
}
def scheduleDequeue(delay: FiniteDuration): Unit = inboundThrottleMode match {
case Blackhole // Do nothing
case _ if delay <= Duration.Zero self ! Dequeue
case _ setTimer(DequeueTimerName, Dequeue, delay, repeat = false)
}
}
private[transport] case class ThrottlerHandle(_wrappedHandle: AssociationHandle, throttlerActor: ActorRef)

View file

@ -18,77 +18,77 @@ class ThrottleModeSpec extends AkkaSpec {
}
"in tokenbucket mode allow consuming tokens up to capacity" in {
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 100)
val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10)
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 100)
val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 10)
bucket1 must be(TokenBucket(100, 100, 0, 90))
success1 must be(true)
val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = 0L, 40)
val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = 0L, 40)
bucket2 must be(TokenBucket(100, 100, 0, 50))
success2 must be(true)
val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 0L, 50)
val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 0L, 50)
bucket3 must be(TokenBucket(100, 100, 0, 0))
success3 must be(true)
val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 0, 1)
val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 0, 1)
bucket4 must be(TokenBucket(100, 100, 0, 0))
success4 must be(false)
}
"accurately replenish tokens" in {
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 0)
val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 0)
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 0)
val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 0)
bucket1 must be(TokenBucket(100, 100, 0, 0))
success1 must be(true)
val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 0)
val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 0)
bucket2 must be(TokenBucket(100, 100, halfSecond, 50))
success2 must be(true)
val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 0)
val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 0)
bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 100))
success3 must be(true)
val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 0)
val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 0)
bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 100))
success4 must be(true)
}
"accurately interleave replenish and consume" in {
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20)
val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10)
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 20)
val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 10)
bucket1 must be(TokenBucket(100, 100, 0, 10))
success1 must be(true)
val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 60)
val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 60)
bucket2 must be(TokenBucket(100, 100, halfSecond, 0))
success2 must be(true)
val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 40)
val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 40)
bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 10))
success3 must be(true)
val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 70)
val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 70)
bucket4 must be(TokenBucket(100, 100, 2 * halfSecond, 10))
success4 must be(false)
}
"allow oversized packets through by loaning" in {
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20)
val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 30)
val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 20)
val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 30)
bucket1 must be(TokenBucket(100, 100, 0, 20))
success1 must be(false)
val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 110)
val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 110)
bucket2 must be(TokenBucket(100, 100, halfSecond, -40))
success2 must be(true)
val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 20)
val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 20)
bucket3 must be(TokenBucket(100, 100, halfSecond, -40))
success3 must be(false)
val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 20)
val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 20)
bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 40))
success4 must be(true)
}