diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala index deef1871c2..56d795c83b 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala @@ -37,8 +37,9 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod override def afterEach { runOn(local) { - testConductor.throttle(local, remote, Direction.Both, -1).await + testConductor.passThrough(local, remote, Direction.Both).await } + enterBarrier("after-each") } @volatile var target: ActorRef = system.deadLetters @@ -46,8 +47,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s)) def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2)) - - def sendN(n: Int) = (1 to n) foreach (proxy ! _) + + def sendN(n: Int) = (1 to n) foreach (proxy ! _) def expectN(n: Int) = (1 to n) foreach { n ⇒ expectMsg(n); lastSender must be === target } "A ReliableProxy" must { @@ -82,6 +83,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod runOn(remote) { expectMsg("hello") } + + enterBarrier("initialize-done") } "forward messages in sequence" in { @@ -95,9 +98,9 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod expectN(100) } } - + enterBarrier("test1a") - + runOn(local) { sendN(100) expectTransition(Idle, Active) @@ -108,7 +111,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod expectN(100) } } - + enterBarrier("test1b") } @@ -121,17 +124,17 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod expectNoMsg } } - + enterBarrier("test2a") - + runOn(remote) { expectNoMsg(0 seconds) } - + enterBarrier("test2b") - + runOn(local) { - testConductor.throttle(local, remote, Direction.Send, -1).await + testConductor.passThrough(local, remote, Direction.Send).await within(5 seconds) { expectTransition(Active, Idle) } } runOn(remote) { @@ -139,7 +142,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod expectN(100) } } - + enterBarrier("test2c") } @@ -157,14 +160,14 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod expectN(100) } } - + enterBarrier("test3a") - + runOn(local) { - testConductor.throttle(local, remote, Direction.Receive, -1).await + testConductor.passThrough(local, remote, Direction.Receive).await within(5 seconds) { expectTransition(Active, Idle) } } - + enterBarrier("test3b") } @@ -182,11 +185,11 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod expectN(50) } } - + enterBarrier("test4a") - + runOn(local) { - testConductor.throttle(local, remote, Direction.Send, rateMBit = -1).await + testConductor.passThrough(local, remote, Direction.Send).await testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.1).await sendN(50) within(5 seconds) { @@ -199,8 +202,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod expectN(50) } } - - enterBarrier("test4a") + + enterBarrier("test4b") } } diff --git a/akka-docs/rst/dev/multi-node-testing.rst b/akka-docs/rst/dev/multi-node-testing.rst index b098317054..eb293eaeb8 100644 --- a/akka-docs/rst/dev/multi-node-testing.rst +++ b/akka-docs/rst/dev/multi-node-testing.rst @@ -207,8 +207,8 @@ surprising ways. * Don't issue a shutdown of the first node. The first node is the controller and if it shuts down your test will break. - * To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the ``TestConductorTranport`` - by specifying ``testTransport(on = true)`` in your MultiNodeConfig. + * To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the failure injector and + throttler transport adapters by specifying ``testTransport(on = true)`` in your MultiNodeConfig. * Throttling, shutdown and other failure injections can only be done from the first node, which again is the controller. diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 31d05b8a71..ea14dccc57 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -94,8 +94,8 @@ trait Conductor { this: TestConductorExt ⇒ * increased latency. * * ====Note==== - * To use this feature you must activate the `TestConductorTranport` - * by specifying `testTransport(on = true)` in your MultiNodeConfig. + * To use this feature you must activate the failure injector and throttler + * transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig. * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be throttled @@ -115,18 +115,15 @@ trait Conductor { this: TestConductorExt ⇒ * Socket. * * ====Note==== - * To use this feature you must activate the `TestConductorTranport` - * by specifying `testTransport(on = true)` in your MultiNodeConfig. + * To use this feature you must activate the failure injector and throttler + * transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig. * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be impeded * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` */ - def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { - import Settings.QueryTimeout - requireTestConductorTranport() - controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done] - } + def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = + throttle(node, target, direction, 0f) private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl.")) throw new ConfigurationException("To use this feature you must activate the failure injector adapters " + @@ -137,18 +134,15 @@ trait Conductor { this: TestConductorExt ⇒ * sending and/or receiving. * * ====Note==== - * To use this feature you must activate the `TestConductorTranport` - * by specifying `testTransport(on = true)` in your MultiNodeConfig. + * To use this feature you must activate the failure injector and throttler + * transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig. * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be impeded * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` */ - def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { - import Settings.QueryTimeout - requireTestConductorTranport() - controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done] - } + def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = + throttle(node, target, direction, -1f) /** * Tell the remote support to shutdown the connection to the given remote diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala index 4a054fdff4..3ad3027331 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala @@ -36,8 +36,8 @@ object TestConductor extends ExtensionKey[TestConductorExt] { * to be a [[akka.remote.RemoteActorRefProvider]]. * * To use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the - * `TestConductorTranport` by specifying `testTransport(on = true)` in your - * MultiNodeConfig. + * failure injector and throttler transport adapters by specifying `testTransport(on = true)` + * in your MultiNodeConfig. * */ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player { diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index ec926dd3af..f390c1b32f 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -87,8 +87,8 @@ abstract class MultiNodeConfig { /** * To be able to use `blackhole`, `passThrough`, and `throttle` you must - * activate the TestConductorTranport by specifying - * `testTransport(on = true)` in your MultiNodeConfig. + * activate the failure injector and throttler transport adapters by + * specifying `testTransport(on = true)` in your MultiNodeConfig. */ def testTransport(on: Boolean): Unit = _testTransport = on diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index d76bbbf071..82143f483a 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -59,43 +59,42 @@ object ThrottlerTransportAdapter { case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode) sealed trait ThrottleMode { - def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) - def timeToAvailable(currentTime: Long, tokens: Int): Long + def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) + def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration } - case class TokenBucket(capacity: Int, tokensPerSecond: Double, lastSend: Long, availableTokens: Int) + case class TokenBucket(capacity: Int, tokensPerSecond: Double, nanoTimeOfLastSend: Long, availableTokens: Int) extends ThrottleMode { - private def isAvailable(timeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) { + private def isAvailable(nanoTimeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) { true // Allow messages larger than capacity through, it will be recorded as negative tokens - } else min((availableTokens + tokensGenerated(timeOfSend)), capacity) >= tokens + } else min((availableTokens + tokensGenerated(nanoTimeOfSend)), capacity) >= tokens - override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = { - if (isAvailable(timeOfSend, tokens)) + override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = { + if (isAvailable(nanoTimeOfSend, tokens)) (this.copy( - lastSend = timeOfSend, - availableTokens = min(availableTokens - tokens + tokensGenerated(timeOfSend), capacity)), true) + nanoTimeOfLastSend = nanoTimeOfSend, + availableTokens = min(availableTokens - tokens + tokensGenerated(nanoTimeOfSend), capacity)), true) else (this, false) } - override def timeToAvailable(currentTime: Long, tokens: Int): Long = { - val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentTime) - TimeUnit.SECONDS.toNanos((needed / tokensPerSecond).toLong) + override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = { + val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentNanoTime) + (needed / tokensPerSecond).seconds } - private def tokensGenerated(timeOfSend: Long): Int = - (TimeUnit.NANOSECONDS.toMillis(timeOfSend - lastSend) * tokensPerSecond / 1000.0).toInt + private def tokensGenerated(nanoTimeOfSend: Long): Int = + (TimeUnit.NANOSECONDS.toMillis(nanoTimeOfSend - nanoTimeOfLastSend) * tokensPerSecond / 1000.0).toInt } case object Unthrottled extends ThrottleMode { - - override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true) - override def timeToAvailable(currentTime: Long, tokens: Int): Long = 1L + override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true) + override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero } case object Blackhole extends ThrottleMode { - override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false) - override def timeToAvailable(currentTime: Long, tokens: Int): Long = 0L + override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false) + override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero } } @@ -215,6 +214,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A } object ThrottledAssociation { + private final val DequeueTimerName = "dequeue" + case object Dequeue sealed trait ThrottlerState @@ -253,7 +254,7 @@ private[transport] class ThrottledAssociation( import context.dispatcher var inboundThrottleMode: ThrottleMode = _ - var queue = Queue.empty[ByteString] + var throttledMessages = Queue.empty[ByteString] var upstreamListener: HandleEventListener = _ override def postStop(): Unit = originalHandle.disassociate() @@ -272,7 +273,7 @@ private[transport] class ThrottledAssociation( when(WaitOrigin) { case Event(InboundPayload(p), ExposedHandle(exposedHandle)) ⇒ - queue = queue enqueue p + throttledMessages = throttledMessages enqueue p peekOrigin(p) match { case Some(origin) ⇒ manager ! Checkin(origin, exposedHandle) @@ -283,12 +284,12 @@ private[transport] class ThrottledAssociation( when(WaitMode) { case Event(InboundPayload(p), _) ⇒ - queue = queue enqueue p + throttledMessages = throttledMessages enqueue p stay() case Event(mode: ThrottleMode, ExposedHandle(exposedHandle)) ⇒ inboundThrottleMode = mode if (inboundThrottleMode == Blackhole) { - queue = Queue.empty[ByteString] + throttledMessages = Queue.empty[ByteString] exposedHandle.disassociate() stop() } else { @@ -300,7 +301,7 @@ private[transport] class ThrottledAssociation( when(WaitUpstreamListener) { case Event(InboundPayload(p), _) ⇒ - queue = queue enqueue p + throttledMessages = throttledMessages enqueue p stay() case Event(listener: HandleEventListener, _) ⇒ upstreamListener = listener @@ -315,30 +316,30 @@ private[transport] class ThrottledAssociation( self ! Dequeue goto(Throttling) case Event(InboundPayload(p), _) ⇒ - queue = queue enqueue p + throttledMessages = throttledMessages enqueue p stay() } when(Throttling) { case Event(mode: ThrottleMode, _) ⇒ inboundThrottleMode = mode - if (inboundThrottleMode == Blackhole) queue = Queue.empty[ByteString] + if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString] + cancelTimer(DequeueTimerName) + if (throttledMessages.nonEmpty) + scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length)) stay() case Event(InboundPayload(p), _) ⇒ forwardOrDelay(p) stay() case Event(Dequeue, _) ⇒ - if (!queue.isEmpty) { - val (payload, newqueue) = queue.dequeue + if (throttledMessages.nonEmpty) { + val (payload, newqueue) = throttledMessages.dequeue upstreamListener notify InboundPayload(payload) - queue = newqueue + throttledMessages = newqueue inboundThrottleMode = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), payload.length)._1 - if (inboundThrottleMode == Unthrottled && !queue.isEmpty) self ! Dequeue - else if (!queue.isEmpty) { - context.system.scheduler.scheduleOnce( - inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length).nanos, self, Dequeue) - } + if (throttledMessages.nonEmpty) + scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length)) } stay() @@ -370,24 +371,28 @@ private[transport] class ThrottledAssociation( if (inboundThrottleMode == Blackhole) { // Do nothing } else { - if (queue.isEmpty) { + if (throttledMessages.isEmpty) { val tokens = payload.length val (newbucket, success) = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), tokens) if (success) { inboundThrottleMode = newbucket upstreamListener notify InboundPayload(payload) } else { - queue = queue.enqueue(payload) - - context.system.scheduler.scheduleOnce( - inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens).nanos, self, Dequeue) + throttledMessages = throttledMessages.enqueue(payload) + scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens)) } } else { - queue = queue.enqueue(payload) + throttledMessages = throttledMessages.enqueue(payload) } } } + def scheduleDequeue(delay: FiniteDuration): Unit = inboundThrottleMode match { + case Blackhole ⇒ // Do nothing + case _ if delay <= Duration.Zero ⇒ self ! Dequeue + case _ ⇒ setTimer(DequeueTimerName, Dequeue, delay, repeat = false) + } + } private[transport] case class ThrottlerHandle(_wrappedHandle: AssociationHandle, throttlerActor: ActorRef) diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala index c91007b479..bda2e0a517 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala @@ -18,77 +18,77 @@ class ThrottleModeSpec extends AkkaSpec { } "in tokenbucket mode allow consuming tokens up to capacity" in { - val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 100) - val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10) + val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 100) + val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 10) bucket1 must be(TokenBucket(100, 100, 0, 90)) success1 must be(true) - val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = 0L, 40) + val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = 0L, 40) bucket2 must be(TokenBucket(100, 100, 0, 50)) success2 must be(true) - val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 0L, 50) + val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 0L, 50) bucket3 must be(TokenBucket(100, 100, 0, 0)) success3 must be(true) - val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 0, 1) + val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 0, 1) bucket4 must be(TokenBucket(100, 100, 0, 0)) success4 must be(false) } "accurately replenish tokens" in { - val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 0) - val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 0) + val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 0) + val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 0) bucket1 must be(TokenBucket(100, 100, 0, 0)) success1 must be(true) - val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 0) + val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 0) bucket2 must be(TokenBucket(100, 100, halfSecond, 50)) success2 must be(true) - val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 0) + val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 0) bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 100)) success3 must be(true) - val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 0) + val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 0) bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 100)) success4 must be(true) } "accurately interleave replenish and consume" in { - val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20) - val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10) + val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 20) + val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 10) bucket1 must be(TokenBucket(100, 100, 0, 10)) success1 must be(true) - val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 60) + val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 60) bucket2 must be(TokenBucket(100, 100, halfSecond, 0)) success2 must be(true) - val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 40) + val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 40) bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 10)) success3 must be(true) - val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 70) + val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 70) bucket4 must be(TokenBucket(100, 100, 2 * halfSecond, 10)) success4 must be(false) } "allow oversized packets through by loaning" in { - val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20) - val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 30) + val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 20) + val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 30) bucket1 must be(TokenBucket(100, 100, 0, 20)) success1 must be(false) - val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 110) + val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 110) bucket2 must be(TokenBucket(100, 100, halfSecond, -40)) success2 must be(true) - val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 20) + val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 20) bucket3 must be(TokenBucket(100, 100, halfSecond, -40)) success3 must be(false) - val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 20) + val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 20) bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 40)) success4 must be(true) }