make maxAllowedWait timeout be dilated #21996

This commit is contained in:
ortigali 2016-12-19 12:02:32 +05:00
parent 2cb912d9ef
commit 3c5223650d
2 changed files with 8 additions and 8 deletions

View file

@ -5,7 +5,6 @@ package docs.stream.javadsl.cookbook;
import akka.NotUsed; import akka.NotUsed;
import akka.actor.*; import akka.actor.*;
import akka.dispatch.Mapper;
import akka.japi.pf.ReceiveBuilder; import akka.japi.pf.ReceiveBuilder;
import akka.pattern.PatternsCS; import akka.pattern.PatternsCS;
import akka.stream.*; import akka.stream.*;
@ -18,7 +17,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.PartialFunction; import scala.PartialFunction;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit; import scala.runtime.BoxedUnit;
@ -121,11 +119,12 @@ public class RecipeGlobalRateLimit extends RecipeTest {
private void releaseWaiting() { private void releaseWaiting() {
final List<ActorRef> toBeReleased = new ArrayList<>(permitTokens); final List<ActorRef> toBeReleased = new ArrayList<>(permitTokens);
for (int i = 0; i < permitTokens && i < waitQueue.size(); i++) { for (Iterator<ActorRef> it = waitQueue.iterator(); permitTokens > 0 && it.hasNext();) {
toBeReleased.add(waitQueue.remove(i)); toBeReleased.add(it.next());
it.remove();
permitTokens --;
} }
permitTokens -= toBeReleased.size();
toBeReleased.stream().forEach(ref -> ref.tell(MAY_PASS, self())); toBeReleased.stream().forEach(ref -> ref.tell(MAY_PASS, self()));
if (permitTokens > 0) { if (permitTokens > 0) {
context().become(open()); context().become(open());
@ -186,7 +185,7 @@ public class RecipeGlobalRateLimit extends RecipeTest {
} }
}; };
final FiniteDuration twoSeconds = Duration.create(2, TimeUnit.SECONDS); final FiniteDuration twoSeconds = (FiniteDuration) dilated(Duration.create(2, TimeUnit.SECONDS));
final Sink<String, TestSubscriber.Probe<String>> sink = TestSink.probe(system); final Sink<String, TestSubscriber.Probe<String>> sink = TestSink.probe(system);
final TestSubscriber.Probe<String> probe = final TestSubscriber.Probe<String> probe =

View file

@ -5,6 +5,7 @@ import akka.actor.{ Props, ActorRef, Actor }
import akka.stream.ClosedShape import akka.stream.ClosedShape
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.testkit._
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -94,8 +95,8 @@ class RecipeGlobalRateLimit extends RecipeSpec {
// Use a large period and emulate the timer by hand instead // Use a large period and emulate the timer by hand instead
val limiter = system.actorOf(Limiter.props(2, 100.days, 1), "limiter") val limiter = system.actorOf(Limiter.props(2, 100.days, 1), "limiter")
val source1 = Source.fromIterator(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds)) val source1 = Source.fromIterator(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds.dilated))
val source2 = Source.fromIterator(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds)) val source2 = Source.fromIterator(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds.dilated))
val probe = TestSubscriber.manualProbe[String]() val probe = TestSubscriber.manualProbe[String]()