19958: Add tests for throttle reuse
This commit is contained in:
parent
a0515780db
commit
8459bfaef2
3 changed files with 68 additions and 1 deletions
|
|
@ -0,0 +1,50 @@
|
|||
package akka.stream.javadsl;
|
||||
|
||||
import akka.Done;
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.japi.Pair;
|
||||
import akka.japi.function.Function;
|
||||
import akka.stream.*;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import akka.testkit.AkkaSpec;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class FlowThrottleTest extends StreamTest {
|
||||
public FlowThrottleTest() {
|
||||
super(actorSystemResource);
|
||||
}
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource =
|
||||
new AkkaJUnitActorSystemResource("ThrottleTest", AkkaSpec.testConf());
|
||||
|
||||
@Test
|
||||
public void mustWorksForTwoStreams() throws Exception {
|
||||
final Flow<Integer, Integer, NotUsed> sharedThrottle =
|
||||
Flow.of(Integer.class)
|
||||
.throttle(1, FiniteDuration.create(1, TimeUnit.DAYS), 1, ThrottleMode.enforcing());
|
||||
|
||||
CompletionStage<List<Integer>> result1 =
|
||||
Source.single(1).via(sharedThrottle).via(sharedThrottle).runWith(Sink.seq(), materializer);
|
||||
|
||||
// If there is accidental shared state then we would not be able to pass through the single element
|
||||
assertEquals(result1.toCompletableFuture().get(3, TimeUnit.SECONDS), Collections.singletonList(1));
|
||||
|
||||
// It works with a new stream, too
|
||||
CompletionStage<List<Integer>> result2 =
|
||||
Source.single(1).via(sharedThrottle).via(sharedThrottle).runWith(Sink.seq(), materializer);
|
||||
|
||||
assertEquals(result2.toCompletableFuture().get(3, TimeUnit.SECONDS), Collections.singletonList(1));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -46,6 +46,23 @@ class FlowThrottleSpec extends AkkaSpec {
|
|||
.cancel() // We won't wait 100 days, sorry
|
||||
}
|
||||
|
||||
"work if there are two throttles in different streams" in Utils.assertAllStagesStopped {
|
||||
val sharedThrottle = Flow[Int].throttle(1, 1.day, 1, Enforcing)
|
||||
|
||||
// If there is accidental shared state then we would not be able to pass through the single element
|
||||
Source.single(1)
|
||||
.via(sharedThrottle)
|
||||
.via(sharedThrottle)
|
||||
.runWith(Sink.seq).futureValue should ===(Seq(1))
|
||||
|
||||
// It works with a new stream, too
|
||||
Source.single(2)
|
||||
.via(sharedThrottle)
|
||||
.via(sharedThrottle)
|
||||
.runWith(Sink.seq).futureValue should ===(Seq(2))
|
||||
|
||||
}
|
||||
|
||||
"emit single element per tick" in Utils.assertAllStagesStopped {
|
||||
val upstream = TestPublisher.probe[Int]()
|
||||
val downstream = TestSubscriber.probe[Int]()
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ import scala.compat.java8.FutureConverters._
|
|||
* a Reactive Streams `Publisher` (at least conceptually).
|
||||
*/
|
||||
final class Source[+Out, +Mat](private[stream] override val module: Module)
|
||||
extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] {
|
||||
extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] {
|
||||
|
||||
override type Repr[+O] = Source[O, Mat @uncheckedVariance]
|
||||
override type ReprMat[+O, +M] = Source[O, M]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue